Skip to content

Commit

Permalink
kafka: support new log attrs for kafka server
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored and 4eUeP committed May 17, 2024
1 parent 4c6a13c commit ed8a51a
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 0 deletions.
3 changes: 3 additions & 0 deletions conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ kafka:
# fetch-mode: 1 # TODO: Currently, only mode 1 is supported
# fetch-reader-timeout: 50 # 50ms, default timeout of each read, 0 means nonblocking
# fetch-maxlen: 1000 # default max size of each read
# scd-enabled: false # enable Single Copy Delivery mode, default is false
# local-scd-enabled: false
# sticky-copysets: false # enable sticky copyset, default is false

# Configuration for HStream Store
# The configuration for hstore is **Optional**. When the values are not provided,
Expand Down
3 changes: 3 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ parseJSONToOptions CliOptions{..} obj = do
storageCfg <- nodeCfgObj .:? "storage" .!= mempty
fetchReaderTimeout <- storageCfg .:? "fetch-reader-timeout" .!= 50
fetchMaxLen <- storageCfg .:? "fetch-maxlen" .!= 1000
scdEnabled <- storageCfg .:? "scd-enabled" .!= False
localScdEnabled <- storageCfg .:? "local-scd-enabled" .!= False
stickyCopysets <- storageCfg .:? "sticky-copysets" .!= False
let _storage = StorageOptions{..}

-- SASL config
Expand Down
3 changes: 3 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ parseMetaStoreAddr t =
data StorageOptions = StorageOptions
{ fetchReaderTimeout :: Int
, fetchMaxLen :: Int
, scdEnabled :: Bool
, localScdEnabled :: Bool
, stickyCopysets :: Bool
} deriving (Show, Eq)

data ExperimentalFeature
Expand Down
5 changes: 5 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Core/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import GHC.Stack (HasCallStack)

import qualified HStream.Base.Time as BaseTime
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Config.Types (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
Expand Down Expand Up @@ -53,6 +55,9 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do
attrs = S.def { S.logReplicationFactor = S.defAttr1 replica
, S.logAttrsExtras = extraAttr
, S.logBacklogDuration = S.defAttr1 (getBacklogDuration topicConfigs)
, S.logScdEnabled = S.defAttr1 serverOpts._storage.scdEnabled
, S.logLocalScdEnabled = S.defAttr1 serverOpts._storage.localScdEnabled
, S.logStickyCopySets = S.defAttr1 serverOpts._storage.stickyCopysets
}
try (S.createStream scLDClient streamId attrs) >>= \case
Left (e :: SomeException)
Expand Down

0 comments on commit ed8a51a

Please sign in to comment.