File tree Expand file tree Collapse file tree 2 files changed +5
-7
lines changed Expand file tree Collapse file tree 2 files changed +5
-7
lines changed Original file line number Diff line number Diff line change @@ -27,7 +27,7 @@ final class KafkaBackPressurePollingSystem {
27
27
/// The state machine that manages the system's state transitions.
28
28
let stateMachineLock : NIOLockedValueBox < StateMachine >
29
29
30
- // TODO: docc
30
+ /// Closure that takes care of polling `librdkafka` for new messages.
31
31
var pollClosure : ( ( ) -> Void ) ?
32
32
/// The ``NIOAsyncSequenceProducer.Source`` used for yielding the messages to the ``NIOAsyncSequenceProducer``.
33
33
var sequenceSource : Producer . Source ? {
Original file line number Diff line number Diff line change @@ -69,8 +69,8 @@ public actor KafkaProducer {
69
69
private var client : KafkaClient
70
70
/// Mechanism that polls the Kafka cluster for updates periodically.
71
71
private let pollingSystem : KafkaBackPressurePollingSystem
72
- // TODO: docc
73
- private var runTask : Task < Void , Never > ?
72
+ /// Task that polls `librdkafka` for new acknowledgements at regular intervals.
73
+ private var pollTask : Task < Void , Never > ?
74
74
75
75
/// `AsyncSequence` that returns all ``KafkaProducerMessage`` objects that have been
76
76
/// acknowledged by the Kafka cluster.
@@ -123,8 +123,7 @@ public actor KafkaProducer {
123
123
logger: self . logger
124
124
)
125
125
126
- // TODO: expose run to user?
127
- self . runTask = Task { [ pollingSystem] in
126
+ self . pollTask = Task { [ pollingSystem] in
128
127
await pollingSystem. run ( pollInterval: . milliseconds( 100 ) )
129
128
}
130
129
@@ -167,8 +166,7 @@ public actor KafkaProducer {
167
166
rd_kafka_topic_destroy ( topicHandle)
168
167
}
169
168
170
- // TODO: kill PollingSystem
171
- self . runTask? . cancel ( )
169
+ self . pollTask? . cancel ( )
172
170
173
171
self . state = . shutDown
174
172
}
You can’t perform that action at this time.
0 commit comments