@@ -6,6 +6,7 @@ module Freckle.App.Kafka.Consumer
6
6
, KafkaConsumerConfig (.. )
7
7
, envKafkaConsumerConfig
8
8
, runConsumer
9
+ , runConsumerBatched
9
10
) where
10
11
11
12
import Prelude
@@ -18,17 +19,19 @@ import Control.Exception.Annotated.UnliftIO
18
19
, displayException
19
20
)
20
21
import Control.Exception.Annotated.UnliftIO qualified as Annotated
21
- import Control.Lens (Lens' , view )
22
+ import Control.Lens (Lens' , over , view , _Left )
22
23
import Control.Monad (forever , (<=<) )
23
24
import Control.Monad.IO.Class (MonadIO )
24
25
import Control.Monad.Reader (MonadReader )
25
26
import Data.Aeson
26
27
import Data.ByteString (ByteString )
27
- import Data.Foldable (for_ )
28
- import Data.List.NonEmpty (NonEmpty )
28
+ import Data.Either (partitionEithers )
29
+ import Data.Foldable (for_ , traverse_ )
30
+ import Data.List.NonEmpty (NonEmpty , nonEmpty )
29
31
import Data.List.NonEmpty qualified as NE
30
32
import Data.Map.Strict (Map )
31
33
import Data.Map.Strict qualified as Map
34
+ import Data.Maybe (mapMaybe )
32
35
import Data.Text (Text )
33
36
import Data.Text qualified as T
34
37
import Data.Text.Encoding qualified as T
@@ -198,7 +201,7 @@ runConsumer pollTimeout onMessage =
198
201
" kafka.consumer"
199
202
(defaultSpanArguments {Trace. kind = Consumer })
200
203
$ do
201
- mRecord <- fromKafkaError =<< pollMessage consumer kTimeout
204
+ mRecord <- fromKafkaError =<< pollMessage consumer (kafkaTimeout pollTimeout)
202
205
203
206
for_ (crValue =<< mRecord) $ \ bs -> do
204
207
a <-
@@ -207,8 +210,6 @@ runConsumer pollTimeout onMessage =
207
210
eitherDecodeStrict bs
208
211
inSpan " kafka.consumer.message.handle" defaultSpanArguments $ onMessage a
209
212
where
210
- kTimeout = Kafka. Timeout $ timeoutMs pollTimeout
211
-
212
213
handlers =
213
214
[ Annotated. Handler $
214
215
logErrorNS " kafka"
@@ -220,6 +221,63 @@ runConsumer pollTimeout onMessage =
220
221
(const " Could not decode message value" )
221
222
]
222
223
224
+ data BatchConsumerError
225
+ = BatchConsumerDecodeError KafkaMessageDecodeError
226
+ | BatchConsumerKafkaError KafkaError
227
+ deriving stock (Show )
228
+
229
+ runConsumerBatched
230
+ :: forall a m env
231
+ . ( MonadUnliftIO m
232
+ , MonadReader env m
233
+ , MonadLogger m
234
+ , MonadTracer m
235
+ , HasKafkaConsumer env
236
+ , FromJSON a
237
+ , HasCallStack
238
+ )
239
+ => Timeout
240
+ -> BatchSize
241
+ -> ([a ] -> m () )
242
+ -> m ()
243
+ runConsumerBatched pollTimeout batchSize onBatch =
244
+ forever $ do
245
+ consumer <- view kafkaConsumerL
246
+
247
+ errors <- inSpan
248
+ " kafka.batchConsumer"
249
+ (defaultSpanArguments {Trace. kind = Consumer })
250
+ $ do
251
+ (errors, records) <- do
252
+ (kafkaErrors, batch) <-
253
+ partitionEithers
254
+ <$> pollMessageBatch consumer (kafkaTimeout pollTimeout) batchSize
255
+ (decodeErrors, records) <- fmap partitionEithers $ inSpan " kafka.batchConsumer.messages.decode" defaultSpanArguments $ do
256
+ let errorDecode bs = over _Left (KafkaMessageDecodeError bs) $ eitherDecodeStrict @ a bs
257
+ pure $ mapMaybe (fmap errorDecode . crValue) batch
258
+ pure
259
+ ( fmap BatchConsumerKafkaError kafkaErrors
260
+ <> fmap BatchConsumerDecodeError decodeErrors
261
+ , records
262
+ )
263
+
264
+ inSpan " kafka.batchConsumer.messages.handle" defaultSpanArguments $
265
+ onBatch records
266
+ pure $ nonEmpty errors
267
+
268
+ traverse_ logErrors errors
269
+ where
270
+ displayConsumerError = \ case
271
+ BatchConsumerDecodeError e -> displayException e
272
+ BatchConsumerKafkaError e -> displayException e
273
+ logErrors errors =
274
+ logErrorNS " kafka" $
275
+ " Batch consumer errors"
276
+ :# [" errors" .= fmap displayConsumerError errors]
277
+
278
+ kafkaTimeout :: Timeout -> Kafka. Timeout
279
+ kafkaTimeout = Kafka. Timeout . timeoutMs
280
+
223
281
-- | Like 'annotatedExceptionMessage', but use the supplied function to
224
282
-- construct an initial 'Message' that it will augment.
225
283
annotatedExceptionMessageFrom
0 commit comments