Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka batch consumer #217

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion freckle-app/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## [_Unreleased_](https://github.com/freckle/freckle-app/compare/freckle-app-v1.21.0.0...main)
## [_Unreleased_](https://github.com/freckle/freckle-app/compare/freckle-app-v1.21.1.0...main)

## [v1.21.1.0](https://github.com/freckle/freckle-app/compare/freckle-app-v1.21.0.0...freckle-app-v1.21.1.0)

Add `Freckle.App.Kafka.Consumer.runConsumerBatched`

## [v1.21.0.0](https://github.com/freckle/freckle-app/compare/freckle-app-v1.20.3.0...freckle-app-v1.21.0.0)

Expand Down
2 changes: 1 addition & 1 deletion freckle-kafka/freckle-kafka.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 1.18

-- This file has been generated from package.yaml by hpack version 0.36.0.
-- This file has been generated from package.yaml by hpack version 0.37.0.
--
-- see: https://github.com/sol/hpack

Expand Down
70 changes: 64 additions & 6 deletions freckle-kafka/library/Freckle/App/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Freckle.App.Kafka.Consumer
, KafkaConsumerConfig (..)
, envKafkaConsumerConfig
, runConsumer
, runConsumerBatched
) where

import Prelude
Expand All @@ -18,17 +19,19 @@ import Control.Exception.Annotated.UnliftIO
, displayException
)
import Control.Exception.Annotated.UnliftIO qualified as Annotated
import Control.Lens (Lens', view)
import Control.Lens (Lens', over, view, _Left)
import Control.Monad (forever, (<=<))
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Reader (MonadReader)
import Data.Aeson
import Data.ByteString (ByteString)
import Data.Foldable (for_)
import Data.List.NonEmpty (NonEmpty)
import Data.Either (partitionEithers)
import Data.Foldable (for_, traverse_)
import Data.List.NonEmpty (NonEmpty, nonEmpty)
import Data.List.NonEmpty qualified as NE
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (mapMaybe)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.Encoding qualified as T
Expand Down Expand Up @@ -198,7 +201,7 @@ runConsumer pollTimeout onMessage =
"kafka.consumer"
(defaultSpanArguments {Trace.kind = Consumer})
$ do
mRecord <- fromKafkaError =<< pollMessage consumer kTimeout
mRecord <- fromKafkaError =<< pollMessage consumer (kafkaTimeout pollTimeout)

for_ (crValue =<< mRecord) $ \bs -> do
a <-
Expand All @@ -207,8 +210,6 @@ runConsumer pollTimeout onMessage =
eitherDecodeStrict bs
inSpan "kafka.consumer.message.handle" defaultSpanArguments $ onMessage a
where
kTimeout = Kafka.Timeout $ timeoutMs pollTimeout

handlers =
[ Annotated.Handler $
logErrorNS "kafka"
Expand All @@ -220,6 +221,63 @@ runConsumer pollTimeout onMessage =
(const "Could not decode message value")
]

data BatchConsumerError
= BatchConsumerDecodeError KafkaMessageDecodeError
| BatchConsumerKafkaError KafkaError
deriving stock (Show)

runConsumerBatched
:: forall a m env
. ( MonadUnliftIO m
, MonadReader env m
, MonadLogger m
, MonadTracer m
, HasKafkaConsumer env
, FromJSON a
, HasCallStack
)
=> Timeout
-> BatchSize
-> ([a] -> m ())
-> m ()
runConsumerBatched pollTimeout batchSize onBatch =
forever $ do
consumer <- view kafkaConsumerL

errors <- inSpan
"kafka.batchConsumer"
(defaultSpanArguments {Trace.kind = Consumer})
$ do
(errors, records) <- do
(kafkaErrors, batch) <-
partitionEithers
<$> pollMessageBatch consumer (kafkaTimeout pollTimeout) batchSize
(decodeErrors, records) <- fmap partitionEithers $ inSpan "kafka.batchConsumer.messages.decode" defaultSpanArguments $ do
let errorDecode bs = over _Left (KafkaMessageDecodeError bs) $ eitherDecodeStrict @a bs
pure $ mapMaybe (fmap errorDecode . crValue) batch
pure
( fmap BatchConsumerKafkaError kafkaErrors
<> fmap BatchConsumerDecodeError decodeErrors
, records
)

inSpan "kafka.batchConsumer.messages.handle" defaultSpanArguments $
onBatch records
pure $ nonEmpty errors

traverse_ logErrors errors
where
displayConsumerError = \case
BatchConsumerDecodeError e -> displayException e
BatchConsumerKafkaError e -> displayException e
logErrors errors =
logErrorNS "kafka" $
"Batch consumer errors"
:# ["errors" .= fmap displayConsumerError errors]

kafkaTimeout :: Timeout -> Kafka.Timeout
kafkaTimeout = Kafka.Timeout . timeoutMs

-- | Like 'annotatedExceptionMessage', but use the supplied function to
-- construct an initial 'Message' that it will augment.
annotatedExceptionMessageFrom
Expand Down
Loading