Skip to content

Commit 521d9b2

Browse files
committed
Review Franz
Modifications: * rename `KafkaPollingSystem.stateMachineLock` to `stateMachine` * make `KafkaPollingSystem.stateMachine` private * `KafkaPollingSystem.run()`: rethrow if `Task.sleep` fails * `KafkaPollingSystem.run()`: throw `CancellationError` when Task is cancelled while suspended * remove unneccesary `runTask.cancel()` invocations * `README`: use `TaskGroup` in `KafkaProducer` example
1 parent 57a4b51 commit 521d9b2

File tree

6 files changed

+112
-82
lines changed

6 files changed

+112
-82
lines changed

README.md

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,28 @@ let producer = try await KafkaProducer(
1616
logger: .kafkaTest // Your logger here
1717
)
1818

19-
let runTask = Task {
20-
await producer.run()
21-
}
19+
await withThrowingTaskGroup(of: Void.self) { group in
2220

23-
let messageID = try await producer.sendAsync(
24-
KafkaProducerMessage(
25-
topic: "topic-name",
26-
value: "Hello, World!"
27-
)
28-
)
21+
group.addTask {
22+
try await producer.run()
23+
}
2924

30-
for await acknowledgement in producer.acknowledgements {
31-
// Check if acknowledgement belongs to the sent message
32-
}
25+
group.addTask {
26+
let messageID = try await producer.sendAsync(
27+
KafkaProducerMessage(
28+
topic: "topic-name",
29+
value: "Hello, World!"
30+
)
31+
)
3332

34-
runTask.cancel()
33+
for await acknowledgement in producer.acknowledgements {
34+
// Check if acknowledgement belongs to the sent message
35+
}
3536

36-
// Required
37-
await producer.shutdownGracefully()
37+
// Required
38+
await producer.shutdownGracefully()
39+
}
40+
}
3841
```
3942

4043
### Consumer API

Sources/SwiftKafka/KafkaPollingSystem.swift

Lines changed: 67 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,32 @@ final class KafkaPollingSystem<Element>: Sendable {
2424
typealias Producer = NIOAsyncSequenceProducer<Element, HighLowWatermark, KafkaPollingSystem>
2525

2626
/// The state machine that manages the system's state transitions.
27-
let stateMachineLock: NIOLockedValueBox<StateMachine>
27+
private let stateMachine: NIOLockedValueBox<StateMachine>
2828

29-
/// Initializes the ``KafkaBackPressurePollingSystem``.
30-
/// Private initializer. The ``KafkaBackPressurePollingSystem`` is not supposed to be initialized directly.
31-
/// It must rather be initialized using the ``KafkaBackPressurePollingSystem.createSystemAndSequence`` function.
29+
/// Allows the producer to synchronously `yield` new elements to the ``NIOAsyncSequenceProducer``
30+
/// and to `finish` the sequence.
31+
var source: Producer.Source? {
32+
get {
33+
stateMachine.withLockedValue { $0.source }
34+
}
35+
set {
36+
stateMachine.withLockedValue { $0.source = newValue }
37+
}
38+
}
39+
40+
/// Initializes the ``KafkaPollingSystem``.
41+
/// Private initializer. The ``KafkaPollingSystem`` is not supposed to be initialized directly.
42+
/// It must rather be initialized using the ``KafkaPollingSystem.createSystemAndSequence`` function.
3243
init(pollClosure: @escaping () -> Void) {
33-
self.stateMachineLock = NIOLockedValueBox(StateMachine(pollClosure: pollClosure))
44+
self.stateMachine = NIOLockedValueBox(StateMachine(pollClosure: pollClosure))
3445
}
3546

3647
/// Runs the poll loop with the specified poll interval.
3748
///
3849
/// - Parameter pollInterval: The desired time interval between two consecutive polls.
3950
/// - Returns: An awaitable task representing the execution of the poll loop.
40-
func run(pollInterval: Duration) async {
41-
switch self.stateMachineLock.withLockedValue({ $0.run() }) {
51+
func run(pollInterval: Duration) async throws {
52+
switch self.stateMachine.withLockedValue({ $0.run() }) {
4253
case .alreadyClosed:
4354
return
4455
case .alreadyRunning:
@@ -48,7 +59,7 @@ final class KafkaPollingSystem<Element>: Sendable {
4859
}
4960

5061
while true {
51-
let action = self.stateMachineLock.withLockedValue { $0.nextPollLoopAction() }
62+
let action = self.nextPollLoopAction()
5263

5364
switch action {
5465
case .pollAndSleep(let pollClosure):
@@ -58,31 +69,32 @@ final class KafkaPollingSystem<Element>: Sendable {
5869
do {
5970
try await Task.sleep(for: pollInterval)
6071
} catch {
61-
let action = self.stateMachineLock.withLockedValue { $0.terminate() }
72+
let action = self.stateMachine.withLockedValue { $0.terminate() }
6273
self.handleTerminateAction(action)
74+
throw error
6375
}
6476
case .suspendPollLoop:
6577
// The downstream consumer asked us to stop sending new messages.
6678
// We therefore await until we are unsuspended again.
67-
await withTaskCancellationHandler {
68-
await withCheckedContinuation { continuation in
69-
self.stateMachineLock.withLockedValue { $0.suspendLoop(continuation: continuation) }
79+
try await withTaskCancellationHandler {
80+
try await withCheckedThrowingContinuation { continuation in
81+
self.stateMachine.withLockedValue { $0.suspendLoop(continuation: continuation) }
7082
}
7183
} onCancel: {
72-
let action = self.stateMachineLock.withLockedValue { $0.terminate() }
84+
let action = self.stateMachine.withLockedValue { $0.terminate(CancellationError()) }
7385
self.handleTerminateAction(action)
7486
}
7587
case .shutdownPollLoop:
7688
// We have been asked to close down the poll loop.
77-
let action = self.stateMachineLock.withLockedValue { $0.terminate() }
89+
let action = self.stateMachine.withLockedValue { $0.terminate() }
7890
self.handleTerminateAction(action)
7991
}
8092
}
8193
}
8294

8395
/// Yield new elements to the underlying `NIOAsyncSequenceProducer`.
8496
func yield(_ element: Element) {
85-
self.stateMachineLock.withLockedValue { stateMachine in
97+
self.stateMachine.withLockedValue { stateMachine in
8698
switch stateMachine.state {
8799
case .idle(let source, _, _), .producing(let source, _, _), .stopProducing(let source, _, _, _):
88100
// We can also yield when in .stopProducing,
@@ -111,6 +123,19 @@ final class KafkaPollingSystem<Element>: Sendable {
111123
}
112124
}
113125

126+
/// Determines the next action to be taken in the poll loop based on the current state.
127+
///
128+
/// - Returns: The next action for the poll loop.
129+
func nextPollLoopAction() -> KafkaPollingSystem.StateMachine.PollLoopAction {
130+
return self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
131+
}
132+
133+
/// Stop producing new elements to the
134+
/// `source` ``NIOAsyncSequenceProducer``.
135+
func stopProducing() {
136+
stateMachine.withLockedValue { $0.stopProducing() }
137+
}
138+
114139
/// Invokes the desired action after ``KafkaPollingSystem/StateMachine/terminate()``
115140
/// has been invoked.
116141
func handleTerminateAction(_ action: StateMachine.TerminateAction?) {
@@ -120,6 +145,9 @@ final class KafkaPollingSystem<Element>: Sendable {
120145
case .finishSequenceSourceAndResume(let source, let continuation):
121146
source?.finish()
122147
continuation?.resume()
148+
case .finishSequenceSourceAndResumeWithError(let source, let continuation, let error):
149+
source?.finish()
150+
continuation?.resume(throwing: error)
123151
case .none:
124152
break
125153
}
@@ -128,7 +156,7 @@ final class KafkaPollingSystem<Element>: Sendable {
128156

129157
extension KafkaPollingSystem: NIOAsyncSequenceProducerDelegate {
130158
func produceMore() {
131-
let action = self.stateMachineLock.withLockedValue { $0.produceMore() }
159+
let action = self.stateMachine.withLockedValue { $0.produceMore() }
132160
switch action {
133161
case .resume(let continuation):
134162
continuation?.resume()
@@ -138,13 +166,13 @@ extension KafkaPollingSystem: NIOAsyncSequenceProducerDelegate {
138166
}
139167

140168
func didTerminate() {
141-
let action = self.stateMachineLock.withLockedValue { $0.terminate() }
169+
let action = self.stateMachine.withLockedValue { $0.terminate() }
142170
self.handleTerminateAction(action)
143171
}
144172
}
145173

146174
extension KafkaPollingSystem {
147-
/// The state machine used by the ``KafkaBackPressurePollingSystem``.
175+
/// The state machine used by the ``KafkaPollingSystem``.
148176
struct StateMachine {
149177
/// The possible states of the state machine.
150178
enum State {
@@ -164,7 +192,7 @@ extension KafkaPollingSystem {
164192
/// of `produceMore()` to continue producing messages.
165193
case stopProducing(
166194
source: Producer.Source?,
167-
continuation: CheckedContinuation<Void, Never>?,
195+
continuation: CheckedContinuation<Void, Error>?,
168196
pollClosure: () -> Void,
169197
running: Bool
170198
)
@@ -274,7 +302,7 @@ extension KafkaPollingSystem {
274302
/// Actions to take after ``produceMore()`` has been invoked on the ``KafkaPollingSystem/StateMachine``.
275303
enum ProduceMoreAction {
276304
/// Resume the given `continuation`.
277-
case resume(CheckedContinuation<Void, Never>?)
305+
case resume(CheckedContinuation<Void, Error>?)
278306
}
279307

280308
/// Our downstream consumer allowed us to produce more elements.
@@ -305,7 +333,7 @@ extension KafkaPollingSystem {
305333
///
306334
/// - Parameter continuation: The continuation that will be resumed once we are allowed to produce again.
307335
/// After resuming the continuation, our poll loop will start running again.
308-
mutating func suspendLoop(continuation: CheckedContinuation<Void, Never>) {
336+
mutating func suspendLoop(continuation: CheckedContinuation<Void, Error>) {
309337
switch self.state {
310338
case .finished:
311339
return
@@ -324,12 +352,19 @@ extension KafkaPollingSystem {
324352
/// and resume the given `continuation`.
325353
case finishSequenceSourceAndResume(
326354
source: Producer.Source?,
327-
continuation: CheckedContinuation<Void, Never>?
355+
continuation: CheckedContinuation<Void, Error>?
356+
)
357+
/// Invoke `finish()` on the given `NIOAsyncSequenceProducer.Source`
358+
/// and resume the given `continuation` with an error.
359+
case finishSequenceSourceAndResumeWithError(
360+
source: Producer.Source?,
361+
continuation: CheckedContinuation<Void, Error>?,
362+
error: Error
328363
)
329364
}
330365

331366
/// Terminate the state machine and finish producing elements.
332-
mutating func terminate() -> TerminateAction? {
367+
mutating func terminate(_ error: Error? = nil) -> TerminateAction? {
333368
switch self.state {
334369
case .finished:
335370
return nil
@@ -338,7 +373,15 @@ extension KafkaPollingSystem {
338373
return .finishSequenceSource(source: source)
339374
case .stopProducing(let source, let continuation, _, _):
340375
self.state = .finished
341-
return .finishSequenceSourceAndResume(source: source, continuation: continuation)
376+
if let error = error {
377+
return .finishSequenceSourceAndResumeWithError(
378+
source: source,
379+
continuation: continuation,
380+
error: error
381+
)
382+
} else {
383+
return .finishSequenceSourceAndResume(source: source, continuation: continuation)
384+
}
342385
}
343386
}
344387
}

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,7 @@ public actor KafkaProducer {
142142
wrappedSequence: acknowledgementsSourceAndSequence.sequence
143143
)
144144

145-
self.pollingSystem.stateMachineLock.withLockedValue { stateMachine in
146-
stateMachine.source = acknowledgementsSourceAndSequence.source
147-
}
145+
self.pollingSystem.source = acknowledgementsSourceAndSequence.source
148146

149147
callbackClosure.wrappedClosure = { [logger, pollingSystem] messageResult in
150148
guard let messageResult else {
@@ -191,9 +189,9 @@ public actor KafkaProducer {
191189
///
192190
/// - Parameter pollInterval: The desired time interval between two consecutive polls.
193191
/// - Returns: An awaitable task representing the execution of the poll loop.
194-
public func run(pollInterval: Duration = .milliseconds(100)) async {
192+
public func run(pollInterval: Duration = .milliseconds(100)) async throws {
195193
// TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle)
196-
await self.pollingSystem.run(pollInterval: pollInterval)
194+
try await self.pollingSystem.run(pollInterval: pollInterval)
197195
}
198196

199197
/// Send messages to the Kafka cluster asynchronously, aka "fire and forget".

Tests/IntegrationTests/SwiftKafkaTests.swift

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ final class SwiftKafkaTests: XCTestCase {
7070

7171
func testProduceAndConsumeWithConsumerGroup() async throws {
7272
let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest)
73-
let runTask = Task {
74-
await producer.run()
73+
let _ = Task {
74+
try await producer.run()
7575
}
7676

7777
self.consumerConfig.groupID = "subscription-test-group-id"
@@ -105,13 +105,12 @@ final class SwiftKafkaTests: XCTestCase {
105105
}
106106

107107
await producer.shutdownGracefully()
108-
runTask.cancel()
109108
}
110109

111110
func testProduceAndConsumeWithAssignedTopicPartition() async throws {
112111
let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest)
113-
let runTask = Task {
114-
await producer.run()
112+
let _ = Task {
113+
try await producer.run()
115114
}
116115

117116
let consumer = try KafkaConsumer(
@@ -146,13 +145,12 @@ final class SwiftKafkaTests: XCTestCase {
146145
}
147146

148147
await producer.shutdownGracefully()
149-
runTask.cancel()
150148
}
151149

152150
func testProduceAndConsumeWithCommitSync() async throws {
153151
let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest)
154-
let runTask = Task {
155-
await producer.run()
152+
let _ = Task {
153+
try await producer.run()
156154
}
157155

158156
self.consumerConfig.groupID = "commit-sync-test-group-id"
@@ -182,7 +180,6 @@ final class SwiftKafkaTests: XCTestCase {
182180
XCTAssertEqual(testMessages.count, consumedMessages.count)
183181

184182
await producer.shutdownGracefully()
185-
runTask.cancel()
186183

187184
// Additionally test that commit does not work on closed consumer
188185
do {

Tests/SwiftKafkaTests/KafkaPollingSystemTests.swift

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ final class KafkaPollingSystemTests: XCTestCase {
4545
let sut = KafkaPollingSystem<Message>(pollClosure: {
4646
closureWrapper.funcTofunc()
4747
})
48-
let runTask = Task {
49-
await sut.run(pollInterval: pollInterval)
48+
let _ = Task {
49+
try await sut.run(pollInterval: pollInterval)
5050
}
5151

5252
let expectation = XCTestExpectation(description: "Poll closure invoked after initial produceMore()")
@@ -86,8 +86,6 @@ final class KafkaPollingSystemTests: XCTestCase {
8686
} else {
8787
XCTFail()
8888
}
89-
90-
runTask.cancel()
9189
}
9290

9391
func testNoPollsAfterPollLoopSuspension() async throws {
@@ -97,8 +95,8 @@ final class KafkaPollingSystemTests: XCTestCase {
9795
let sut = KafkaPollingSystem<Message>(pollClosure: {
9896
closureWrapper.funcTofunc()
9997
})
100-
let runTask = Task {
101-
await sut.run(pollInterval: pollInterval)
98+
let _ = Task {
99+
try await sut.run(pollInterval: pollInterval)
102100
}
103101

104102
let expectation = XCTestExpectation(description: "Poll closure invoked after initial produceMore()")
@@ -126,8 +124,6 @@ final class KafkaPollingSystemTests: XCTestCase {
126124
}
127125

128126
try await Task.sleep(for: .seconds(5))
129-
130-
runTask.cancel()
131127
}
132128

133129
func testRunTaskCancellationShutsDownStateMachine() async throws {
@@ -138,7 +134,12 @@ final class KafkaPollingSystemTests: XCTestCase {
138134
closureWrapper.funcTofunc()
139135
})
140136
let runTask = Task {
141-
await sut.run(pollInterval: pollInterval)
137+
do {
138+
try await sut.run(pollInterval: pollInterval)
139+
XCTFail("Should have thrown error")
140+
} catch {
141+
XCTAssertTrue(error is CancellationError)
142+
}
142143
}
143144

144145
let expectation = XCTestExpectation(description: "Poll closure invoked after initial produceMore()")
@@ -177,11 +178,4 @@ final class KafkaPollingSystemTests: XCTestCase {
177178

178179
/// These testing-only methods provide more convenient access to the polling system's locked `stateMachine` methods.
179180
extension KafkaPollingSystem {
180-
func nextPollLoopAction() -> KafkaPollingSystem.StateMachine.PollLoopAction {
181-
return self.stateMachineLock.withLockedValue { $0.nextPollLoopAction() }
182-
}
183-
184-
func stopProducing() {
185-
stateMachineLock.withLockedValue { $0.stopProducing() }
186-
}
187181
}

0 commit comments

Comments
 (0)