@@ -17,6 +17,29 @@ import Logging
17
17
import NIOConcurrencyHelpers
18
18
import NIOCore
19
19
20
+ /// Our `AsyncSequence` implementation wrapping an `NIOAsyncSequenceProducer`.
21
+ public struct KafkaAsyncSequence < Element> : AsyncSequence {
22
+ typealias HighLowWatermark = NIOAsyncSequenceProducerBackPressureStrategies . HighLowWatermark
23
+ typealias PollingSystem = KafkaPollingSystem < Element >
24
+ typealias WrappedSequence = NIOAsyncSequenceProducer < Element , HighLowWatermark , PollingSystem >
25
+ let wrappedSequence : WrappedSequence
26
+
27
+ /// Our `AsynceIteratorProtocol` implementation wrapping `NIOAsyncSequenceProducer.AsyncIterator`.
28
+ public struct KafkaAsyncIterator : AsyncIteratorProtocol {
29
+ let wrappedIterator : NIOAsyncSequenceProducer < Element , HighLowWatermark , PollingSystem > . AsyncIterator
30
+
31
+ public mutating func next( ) async -> Element ? {
32
+ await self . wrappedIterator. next ( )
33
+ }
34
+ }
35
+
36
+ public func makeAsyncIterator( ) -> KafkaAsyncIterator {
37
+ return KafkaAsyncIterator ( wrappedIterator: self . wrappedSequence. makeAsyncIterator ( ) )
38
+ }
39
+ }
40
+
41
+ // MARK: - KafkaPollingSystem
42
+
20
43
/// A back-pressure aware polling system for managing the poll loop that polls `librdkafka` for new acknowledgements.
21
44
final class KafkaPollingSystem < Element> : Sendable {
22
45
typealias HighLowWatermark = NIOAsyncSequenceProducerBackPressureStrategies . HighLowWatermark
@@ -26,33 +49,47 @@ final class KafkaPollingSystem<Element>: Sendable {
26
49
/// The state machine that manages the system's state transitions.
27
50
private let stateMachine : NIOLockedValueBox < StateMachine >
28
51
29
- /// Allows the producer to synchronously `yield` new elements to the ``NIOAsyncSequenceProducer``
30
- /// and to `finish` the sequence.
31
- var source : Producer . Source ? {
32
- get {
33
- self . stateMachine. withLockedValue { $0. source }
34
- }
35
- set {
36
- self . stateMachine. withLockedValue { $0. source = newValue }
37
- }
52
+ /// Initializes the ``KafkaPollingSystem``.
53
+ ///
54
+ /// - Note: ``initialize(backPressureStrategy:pollClosure:)`` still has to be invoked for proper initialization.
55
+ init ( ) {
56
+ self . stateMachine = NIOLockedValueBox ( StateMachine ( ) )
38
57
}
39
58
40
- /// Closure that is used to poll the upstream producer for new updates.
41
- /// In our case the upstream producer is the Kafka cluster.
42
- var pollClosure : ( ( ) -> Void ) ? {
43
- get {
44
- self . stateMachine. withLockedValue { $0. pollClosure }
45
- }
46
- set {
47
- self . stateMachine. withLockedValue { $0. pollClosure = newValue }
59
+ /// Initialize the ``KafkaPollingSystem`` and create the ``KafkaAsyncSequence`` that publishes
60
+ /// message acknowledgements.
61
+ ///
62
+ /// We use this second `initialize()` method to support delayed initialization,
63
+ /// which is needed because the initialization ``NIOAsyncSequenceProducer`` requires a reference
64
+ /// to an existing ``KafkaPollingSystem`` object but our ``StateMachine`` in turn needs a reference to
65
+ /// the ``NIOAsyncSequenceProducer.Source`` object.
66
+ ///
67
+ /// - Returns: The newly created ``KafkaAsyncSequence`` object.
68
+ func initialize(
69
+ backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . HighLowWatermark ,
70
+ pollClosure: @escaping ( ) -> Void
71
+ ) -> KafkaAsyncSequence < Element > {
72
+ // (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
73
+ // This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
74
+ // The source MUST be held by the caller and used to signal new elements or finish.
75
+ // The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller.
76
+ // This is due to the fact that deiniting the sequence is used as part of a trigger to
77
+ // terminate the underlying source.
78
+ let sourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
79
+ elementType: Element . self,
80
+ backPressureStrategy: backPressureStrategy,
81
+ delegate: self
82
+ )
83
+ let sequence = KafkaAsyncSequence ( wrappedSequence: sourceAndSequence. sequence)
84
+
85
+ self . stateMachine. withLockedValue {
86
+ $0. initialize (
87
+ source: sourceAndSequence. source,
88
+ pollClosure: pollClosure
89
+ )
48
90
}
49
- }
50
91
51
- /// Initializes the ``KafkaPollingSystem``.
52
- /// Private initializer. The ``KafkaPollingSystem`` is not supposed to be initialized directly.
53
- /// It must rather be initialized using the ``KafkaPollingSystem.createSystemAndSequence`` function.
54
- init ( ) {
55
- self . stateMachine = NIOLockedValueBox ( StateMachine ( ) )
92
+ return sequence
56
93
}
57
94
58
95
/// Runs the poll loop with the specified poll interval.
@@ -62,7 +99,7 @@ final class KafkaPollingSystem<Element>: Sendable {
62
99
func run( pollInterval: Duration ) async throws {
63
100
switch self . stateMachine. withLockedValue ( { $0. run ( ) } ) {
64
101
case . alreadyClosed:
65
- return
102
+ throw KafkaError . pollLoop ( reason : " Invocation of \( #function ) failed, poll loop already closed. " )
66
103
case . alreadyRunning:
67
104
fatalError ( " Poll loop must not be started more than once " )
68
105
case . startLoop:
@@ -80,8 +117,7 @@ final class KafkaPollingSystem<Element>: Sendable {
80
117
do {
81
118
try await Task . sleep ( for: pollInterval)
82
119
} catch {
83
- let action = self . stateMachine. withLockedValue { $0. terminate ( ) }
84
- self . handleTerminateAction ( action)
120
+ self . terminate ( )
85
121
throw error
86
122
}
87
123
case . suspendPollLoop:
@@ -92,45 +128,27 @@ final class KafkaPollingSystem<Element>: Sendable {
92
128
self . stateMachine. withLockedValue { $0. suspendLoop ( continuation: continuation) }
93
129
}
94
130
} onCancel: {
95
- let action = self . stateMachine. withLockedValue { $0. terminate ( CancellationError ( ) ) }
96
- self . handleTerminateAction ( action)
131
+ self . terminate ( CancellationError ( ) )
97
132
}
98
133
case . shutdownPollLoop:
99
134
// We have been asked to close down the poll loop.
100
- let action = self . stateMachine. withLockedValue { $0. terminate ( ) }
101
- self . handleTerminateAction ( action)
135
+ self . terminate ( )
102
136
}
103
137
}
104
138
}
105
139
106
140
/// Yield new elements to the underlying `NIOAsyncSequenceProducer`.
107
141
func yield( _ element: Element ) {
108
- self . stateMachine. withLockedValue { stateMachine in
109
- switch stateMachine. state {
110
- case . idle( let source, _, _) , . producing( let source, _, _) , . stopProducing( let source, _, _, _) :
111
- // We can also yield when in .stopProducing,
112
- // the AsyncSequenceProducer will buffer for us
113
- let yieldResult = source? . yield ( element)
114
- switch yieldResult {
115
- case . produceMore:
116
- let action = stateMachine. produceMore ( )
117
- switch action {
118
- case . resume( let continuation) :
119
- continuation? . resume ( )
120
- case . none:
121
- break
122
- }
123
- case . stopProducing:
124
- stateMachine. stopProducing ( )
125
- case . dropped:
126
- let action = stateMachine. terminate ( )
127
- self . handleTerminateAction ( action)
128
- case . none:
129
- break
130
- }
131
- case . finished:
132
- return
133
- }
142
+ let action = self . stateMachine. withLockedValue { $0. yield ( element) }
143
+ switch action {
144
+ case . produceMore:
145
+ self . produceMore ( )
146
+ case . stopProducing:
147
+ self . stopProducing ( )
148
+ case . terminate:
149
+ self . terminate ( )
150
+ case . none:
151
+ break
134
152
}
135
153
}
136
154
@@ -147,9 +165,16 @@ final class KafkaPollingSystem<Element>: Sendable {
147
165
self . stateMachine. withLockedValue { $0. stopProducing ( ) }
148
166
}
149
167
168
+ // TODO: try making everything private
169
+ /// Shut down the ``KafkaPollingSystem`` and free its resources.
170
+ func terminate( _ error: Error ? = nil ) {
171
+ let action = self . stateMachine. withLockedValue { $0. terminate ( error) }
172
+ self . handleTerminateAction ( action)
173
+ }
174
+
150
175
/// Invokes the desired action after ``KafkaPollingSystem/StateMachine/terminate()``
151
176
/// has been invoked.
152
- func handleTerminateAction( _ action: StateMachine . TerminateAction ? ) {
177
+ private func handleTerminateAction( _ action: StateMachine . TerminateAction ? ) {
153
178
switch action {
154
179
case . finishSequenceSource( let source) :
155
180
source? . finish ( )
@@ -177,8 +202,7 @@ extension KafkaPollingSystem: NIOAsyncSequenceProducerDelegate {
177
202
}
178
203
179
204
func didTerminate( ) {
180
- let action = self . stateMachine. withLockedValue { $0. terminate ( ) }
181
- self . handleTerminateAction ( action)
205
+ self . terminate ( )
182
206
}
183
207
}
184
208
@@ -187,7 +211,10 @@ extension KafkaPollingSystem {
187
211
struct StateMachine {
188
212
/// The possible states of the state machine.
189
213
enum State {
190
- /// Initial state.
214
+ /// The state machine has been initialized with init() but is not yet Initialized
215
+ /// using `func initialize()` (required).
216
+ case uninitialized
217
+ /// Initialized state (idle).
191
218
case idle(
192
219
source: Producer . Source ? ,
193
220
pollClosure: ( ( ) -> Void ) ? ,
@@ -212,72 +239,18 @@ extension KafkaPollingSystem {
212
239
}
213
240
214
241
/// The current state of the state machine.
215
- var state : State
216
-
217
- /// Allows the producer to synchronously `yield` new elements to the ``NIOAsyncSequenceProducer``
218
- /// and to `finish` the sequence.
219
- var source : Producer . Source ? {
220
- get {
221
- // Extracts source from state machine
222
- switch self . state {
223
- case . idle( let source, _, _) :
224
- return source
225
- case . producing( let source, _, _) :
226
- return source
227
- case . stopProducing( let source, _, _, _) :
228
- return source
229
- case . finished:
230
- return nil
231
- }
232
- }
233
- set {
234
- // Add new source to current state
235
- switch self . state {
236
- case . idle( _, let pollClosure, let running) :
237
- self . state = . idle( source: newValue, pollClosure: pollClosure, running: running)
238
- case . producing( _, let pollClosure, let running) :
239
- self . state = . producing( source: newValue, pollClosure: pollClosure, running: running)
240
- case . stopProducing( _, let continuation, let pollClosure, let running) :
241
- self . state = . stopProducing( source: newValue, continuation: continuation, pollClosure: pollClosure, running: running)
242
- case . finished:
243
- break
244
- }
245
- }
246
- }
242
+ private var state : State = . uninitialized
247
243
248
- /// Closure that is used to poll the upstream producer for new updates.
249
- /// In our case the upstream producer is the Kafka cluster.
250
- var pollClosure : ( ( ) -> Void ) ? {
251
- get {
252
- // Extracts pollClosure from state machine
253
- switch self . state {
254
- case . idle( _, let pollClosure, _) :
255
- return pollClosure
256
- case . producing( _, let pollClosure, _) :
257
- return pollClosure
258
- case . stopProducing( _, _, let pollClosure, _) :
259
- return pollClosure
260
- case . finished:
261
- return nil
262
- }
244
+ /// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are
245
+ /// not yet available when the normal initialization occurs.
246
+ mutating func initialize(
247
+ source: Producer . Source ,
248
+ pollClosure: @escaping ( ) -> Void
249
+ ) {
250
+ guard case . uninitialized = self . state else {
251
+ fatalError ( " \( #function) can only be invoked in state .uninitialized, but was invoked in state \( self . state) " )
263
252
}
264
- set {
265
- // Add new pollClosure to current state
266
- switch self . state {
267
- case . idle( let source, _, let running) :
268
- self . state = . idle( source: source, pollClosure: newValue, running: running)
269
- case . producing( let source, _, let running) :
270
- self . state = . producing( source: source, pollClosure: newValue, running: running)
271
- case . stopProducing( let source, let continuation, _, let running) :
272
- self . state = . stopProducing( source: source, continuation: continuation, pollClosure: newValue, running: running)
273
- case . finished:
274
- break
275
- }
276
- }
277
- }
278
-
279
- init ( ) {
280
- self . state = . idle( source: nil , pollClosure: nil , running: false )
253
+ self . state = . idle( source: source, pollClosure: pollClosure, running: false )
281
254
}
282
255
283
256
/// Actions to take after ``run()`` has been invoked on the ``KafkaPollingSystem/StateMachine``.
@@ -292,6 +265,8 @@ extension KafkaPollingSystem {
292
265
293
266
mutating func run( ) -> RunAction {
294
267
switch self . state {
268
+ case . uninitialized:
269
+ fatalError ( " \( #function) invoked while still in state .uninitialized " )
295
270
case . idle( let source, let pollClosure, let running) :
296
271
guard running == false else {
297
272
return . alreadyRunning
@@ -329,6 +304,8 @@ extension KafkaPollingSystem {
329
304
/// - Returns: The next action for the poll loop.
330
305
func nextPollLoopAction( ) -> PollLoopAction {
331
306
switch self . state {
307
+ case . uninitialized:
308
+ fatalError ( " \( #function) invoked while still in state .uninitialized " )
332
309
case . idle( _, let pollClosure, _) , . producing( _, let pollClosure, _) :
333
310
return . pollAndSleep( pollClosure: pollClosure)
334
311
case . stopProducing:
@@ -341,6 +318,40 @@ extension KafkaPollingSystem {
341
318
}
342
319
}
343
320
321
+ /// Actions to take after an element has been yielded to the underlying ``NIOAsyncSequenceProducer.Source``.
322
+ enum YieldAction {
323
+ /// Produce more elements.
324
+ case produceMore
325
+ /// Stop producing new elements.
326
+ case stopProducing
327
+ /// Shut down the ``KafkaPollingSystem``.
328
+ case terminate
329
+ }
330
+
331
+ /// Yield new elements to the underlying `NIOAsyncSequenceProducer`.
332
+ mutating func yield( _ element: Element ) -> YieldAction ? {
333
+ switch self . state {
334
+ case . uninitialized:
335
+ fatalError ( " \( #function) invoked while still in state .uninitialized " )
336
+ case . idle( let source, _, _) , . producing( let source, _, _) , . stopProducing( let source, _, _, _) :
337
+ // We can also yield when in .stopProducing,
338
+ // the AsyncSequenceProducer will buffer for us
339
+ let yieldResult = source? . yield ( element)
340
+ switch yieldResult {
341
+ case . produceMore:
342
+ return . produceMore
343
+ case . stopProducing:
344
+ return . stopProducing
345
+ case . dropped:
346
+ return . terminate
347
+ case . none:
348
+ return nil
349
+ }
350
+ case . finished:
351
+ return nil
352
+ }
353
+ }
354
+
344
355
/// Actions to take after ``produceMore()`` has been invoked on the ``KafkaPollingSystem/StateMachine``.
345
356
enum ProduceMoreAction {
346
357
/// Resume the given `continuation`.
@@ -350,6 +361,8 @@ extension KafkaPollingSystem {
350
361
/// Our downstream consumer allowed us to produce more elements.
351
362
mutating func produceMore( ) -> ProduceMoreAction ? {
352
363
switch self . state {
364
+ case . uninitialized:
365
+ fatalError ( " \( #function) invoked while still in state .uninitialized " )
353
366
case . finished, . producing:
354
367
break
355
368
case . stopProducing( let source, let continuation, let pollClosure, let running) :
@@ -364,6 +377,8 @@ extension KafkaPollingSystem {
364
377
/// Our downstream consumer asked us to stop producing new elements.
365
378
mutating func stopProducing( ) {
366
379
switch self . state {
380
+ case . uninitialized:
381
+ fatalError ( " \( #function) invoked while still in state .uninitialized " )
367
382
case . idle, . finished, . stopProducing:
368
383
break
369
384
case . producing( let source, let pollClosure, let running) :
@@ -377,6 +392,8 @@ extension KafkaPollingSystem {
377
392
/// After resuming the continuation, our poll loop will start running again.
378
393
mutating func suspendLoop( continuation: CheckedContinuation < Void , Error > ) {
379
394
switch self . state {
395
+ case . uninitialized:
396
+ fatalError ( " \( #function) invoked while still in state .uninitialized " )
380
397
case . finished:
381
398
return
382
399
case . stopProducing( _, . some, _, _) :
@@ -408,6 +425,8 @@ extension KafkaPollingSystem {
408
425
/// Terminate the state machine and finish producing elements.
409
426
mutating func terminate( _ error: Error ? = nil ) -> TerminateAction ? {
410
427
switch self . state {
428
+ case . uninitialized:
429
+ fatalError ( " \( #function) invoked while still in state .uninitialized " )
411
430
case . finished:
412
431
return nil
413
432
case . idle( let source, _, _) , . producing( let source, _, _) :
0 commit comments