Skip to content

Commit

Permalink
Experimental SpareOffset 1: ListOffsets handler
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed May 17, 2024
1 parent 10ee123 commit c20d3d7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 16 deletions.
2 changes: 2 additions & 0 deletions .stylish-haskell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ language_extensions:
- TemplateHaskell
- TypeApplications
- QuasiQuotes
- CPP
- PatternSynonyms

# Attempt to find the cabal file in ancestors of the current directory, and
# parse options (currently only language extensions) from that.
Expand Down
48 changes: 34 additions & 14 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE PatternSynonyms #-}
#ifndef HSTREAM_SPARSE_OFFSET
{-# LANGUAGE PatternSynonyms #-}

module HStream.Kafka.Server.Handler.Offset
( handleOffsetCommit
, handleOffsetFetch
, handleListOffsets
)
where
) where
#endif

import qualified Control.Exception as E
import Data.Int (Int64)
Expand All @@ -17,9 +17,7 @@ import HStream.Kafka.Common.Acl
import HStream.Kafka.Common.Authorizer.Class
import qualified HStream.Kafka.Common.KafkaException as K
import qualified HStream.Kafka.Common.Metrics as Metrics
import HStream.Kafka.Common.OffsetManager (getLatestOffset,
getOffsetByTimestamp,
getOldestOffset)
import qualified HStream.Kafka.Common.OffsetManager as K
import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Utils (forKaArray, forKaArrayM)
import qualified HStream.Kafka.Group.Group as G
Expand All @@ -42,11 +40,20 @@ pattern EarliestTimestamp = (-2)

-- FIXME: This function does not handle any ErrorCodeException.
-- Modify the following 'listOffsetTopicPartitions' to fix it.
handleListOffsets :: ServerContext
-> K.RequestContext
-> K.ListOffsetsRequest
-> IO K.ListOffsetsResponse
#ifndef HSTREAM_SPARSE_OFFSET
handleListOffsets
#else
handleListOffsetsSparseOffset
#endif
:: ServerContext
-> K.RequestContext
-> K.ListOffsetsRequest
-> IO K.ListOffsetsResponse
#ifndef HSTREAM_SPARSE_OFFSET
handleListOffsets sc reqCtx req = do
#else
handleListOffsetsSparseOffset sc reqCtx req = do
#endif
topicResps <- forKaArrayM req.topics $ \listOffsetsTopic -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) sc.authorizer Res_TOPIC listOffsetsTopic.name AclOp_DESCRIBE >>= \case
Expand Down Expand Up @@ -109,18 +116,31 @@ handleListOffsets sc reqCtx req = do
, name = listOffsetsTopic.name
}

#ifndef HSTREAM_SPARSE_OFFSET
-- NOTE: The last offset of a partition is the offset of the upcoming
-- message, i.e. the offset of the last available message + 1.
getOffset logid LatestTimestamp =
maybe 0 (+ 1) <$> getLatestOffset sc.scOffsetManager logid
maybe 0 (+ 1) <$> K.getLatestOffset sc.scOffsetManager logid
getOffset logid EarliestTimestamp =
fromMaybe 0 <$> K.getOldestOffset sc.scOffsetManager logid
-- Return the earliest offset whose timestamp is greater than or equal to
-- the given timestamp.
--
-- TODO: actually, this is not supported currently.
getOffset logid timestamp =
fromMaybe (-1) <$> K.getOffsetByTimestamp sc.scOffsetManager logid timestamp
#else
getOffset logid LatestTimestamp =
maybe 0 K.calNextSparseOffset <$> K.getLatestHeadSparseOffset sc.scOffsetManager logid
getOffset logid EarliestTimestamp =
fromMaybe 0 <$> getOldestOffset sc.scOffsetManager logid
fromMaybe 0 <$> K.getOldestSparseOffset sc.scOffsetManager logid
-- Return the earliest offset whose timestamp is greater than or equal to
-- the given timestamp.
--
-- TODO: actually, this is not supported currently.
getOffset logid timestamp =
fromMaybe (-1) <$> getOffsetByTimestamp sc.scOffsetManager logid timestamp
fromMaybe (-1) <$> K.getSparseOffsetByTimestamp sc.scOffsetManager logid timestamp
#endif

--------------------
-- 8: OffsetCommit
Expand Down
10 changes: 10 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Offset.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{-# LANGUAGE PatternSynonyms #-}
{-# OPTIONS_GHC -Wno-unused-top-binds #-}

module HStream.Kafka.Server.Handler.SparseOffset.Offset
( handleListOffsetsSparseOffset
) where

#define HSTREAM_SPARSE_OFFSET
#include "HStream/Kafka/Server/Handler/Offset.hs"
#undef HSTREAM_SPARSE_OFFSET
6 changes: 4 additions & 2 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ library
HStream.Kafka.Server.Handler.Group
HStream.Kafka.Server.Handler.Offset
HStream.Kafka.Server.Handler.Produce
HStream.Kafka.Server.Handler.SparseOffset.Offset
HStream.Kafka.Server.Handler.Topic
HStream.Kafka.Server.Security.SASL

Expand All @@ -193,7 +194,7 @@ library

extra-libraries: rdkafka++
include-dirs:
include /usr/local/include external/asio/asio/include include
. include /usr/local/include external/asio/asio/include

extra-lib-dirs: /usr/local/lib
hs-source-dirs: .
Expand Down Expand Up @@ -245,6 +246,7 @@ library

default-language: GHC2021
default-extensions:
CPP
DerivingStrategies
LambdaCase
MultiWayIf
Expand All @@ -262,9 +264,9 @@ test-suite hstream-kafka-test
HStream.Kafka.Common.AclEntrySpec
HStream.Kafka.Common.AclSpec
HStream.Kafka.Common.AuthorizerSpec
HStream.Kafka.Common.ConfigSpec
HStream.Kafka.Common.OffsetManagerSpec
HStream.Kafka.Common.TestUtils
HStream.Kafka.Common.ConfigSpec

hs-source-dirs: tests
build-depends:
Expand Down

0 comments on commit c20d3d7

Please sign in to comment.