@@ -37,7 +37,7 @@ final class KafkaPollingSystem<Element>: Sendable {
37
37
///
38
38
/// - Parameter pollInterval: The desired time interval between two consecutive polls.
39
39
/// - Returns: An awaitable task representing the execution of the poll loop.
40
- func run( pollInterval: Duration , pollClosure: @escaping ( ) -> Void , source: Producer . Source ) async {
40
+ func run( pollInterval: Duration , pollClosure: @escaping ( ) -> Void , source: Producer . Source ? ) async {
41
41
switch self . stateMachineLock. withLockedValue ( { $0. run ( source, pollClosure) } ) {
42
42
case . alreadyClosed:
43
43
return
@@ -87,7 +87,7 @@ final class KafkaPollingSystem<Element>: Sendable {
87
87
case . started( let source, _, _) , . producing( let source, _, _) , . stopProducing( let source, _, _, _) :
88
88
// We can also yield when in .stopProducing,
89
89
// the AsyncSequenceProducer will buffer for us
90
- let yieldResult = source. yield ( element)
90
+ let yieldResult = source? . yield ( element)
91
91
switch yieldResult {
92
92
case . produceMore:
93
93
let action = stateMachine. produceMore ( )
@@ -102,6 +102,8 @@ final class KafkaPollingSystem<Element>: Sendable {
102
102
case . dropped:
103
103
let action = stateMachine. terminate ( )
104
104
self . handleTerminateAction ( action)
105
+ case . none:
106
+ break
105
107
}
106
108
case . idle, . finished:
107
109
return
@@ -114,9 +116,9 @@ final class KafkaPollingSystem<Element>: Sendable {
114
116
func handleTerminateAction( _ action: StateMachine . TerminateAction ? ) {
115
117
switch action {
116
118
case . finishSequenceSource( let source) :
117
- source. finish ( )
119
+ source? . finish ( )
118
120
case . finishSequenceSourceAndResume( let source, let continuation) :
119
- source. finish ( )
121
+ source? . finish ( )
120
122
continuation? . resume ( )
121
123
case . none:
122
124
break
@@ -150,20 +152,20 @@ extension KafkaPollingSystem {
150
152
case idle
151
153
/// The ``run()`` method has been invoked and the ``KafkaPollingSystem`` is ready.
152
154
case started(
153
- source: Producer . Source ,
155
+ source: Producer . Source ? ,
154
156
pollClosure: ( ) -> Void ,
155
157
running: Bool
156
158
)
157
159
/// The system up and producing acknowledgement messages.
158
160
case producing(
159
- source: Producer . Source ,
161
+ source: Producer . Source ? ,
160
162
pollClosure: ( ) -> Void ,
161
163
running: Bool
162
164
)
163
165
/// The pool loop is currently suspended and we are waiting for an invocation
164
166
/// of `produceMore()` to continue producing messages.
165
167
case stopProducing(
166
- source: Producer . Source ,
168
+ source: Producer . Source ? ,
167
169
continuation: CheckedContinuation < Void , Never > ? ,
168
170
pollClosure: ( ) -> Void ,
169
171
running: Bool
@@ -185,7 +187,7 @@ extension KafkaPollingSystem {
185
187
case startLoop
186
188
}
187
189
188
- mutating func run( _ source: Producer . Source , _ pollClosure: @escaping ( ) -> Void ) -> RunAction {
190
+ mutating func run( _ source: Producer . Source ? , _ pollClosure: @escaping ( ) -> Void ) -> RunAction {
189
191
switch self . state {
190
192
case . idle:
191
193
self . state = . started( source: source, pollClosure: pollClosure, running: true )
@@ -290,11 +292,11 @@ extension KafkaPollingSystem {
290
292
/// Actions to take after ``terminate()`` has been invoked on the ``KafkaPollingSystem/StateMachine``.
291
293
enum TerminateAction {
292
294
/// Invoke `finish()` on the given `NIOAsyncSequenceProducer.Source`.
293
- case finishSequenceSource( source: Producer . Source )
295
+ case finishSequenceSource( source: Producer . Source ? )
294
296
/// Invoke `finish()` on the given `NIOAsyncSequenceProducer.Source`
295
297
/// and resume the given `continuation`.
296
298
case finishSequenceSourceAndResume(
297
- source: Producer . Source ,
299
+ source: Producer . Source ? ,
298
300
continuation: CheckedContinuation < Void , Never > ?
299
301
)
300
302
}
0 commit comments