Skip to content

Commit 57a4b51

Browse files
committed
Fix race condition in StateMachine
* because `StateMachine.run()` was invoked from a different Task than the other `StateMachine` methods, doing the initialization of the `StateMachine` in run led to a race condition where it could not be guaranteed that the `StateMachine` was `started` when other `StateMachine` methods were invoked * Solution: don't initialize `StateMachine` on run but rather on `init`
1 parent 77a03e5 commit 57a4b51

File tree

3 files changed

+76
-57
lines changed

3 files changed

+76
-57
lines changed

Sources/SwiftKafka/KafkaPollingSystem.swift

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,16 @@ final class KafkaPollingSystem<Element>: Sendable {
2929
/// Initializes the ``KafkaBackPressurePollingSystem``.
3030
/// Private initializer. The ``KafkaBackPressurePollingSystem`` is not supposed to be initialized directly.
3131
/// It must rather be initialized using the ``KafkaBackPressurePollingSystem.createSystemAndSequence`` function.
32-
init() {
33-
self.stateMachineLock = NIOLockedValueBox(StateMachine())
32+
init(pollClosure: @escaping () -> Void) {
33+
self.stateMachineLock = NIOLockedValueBox(StateMachine(pollClosure: pollClosure))
3434
}
3535

3636
/// Runs the poll loop with the specified poll interval.
3737
///
3838
/// - Parameter pollInterval: The desired time interval between two consecutive polls.
3939
/// - Returns: An awaitable task representing the execution of the poll loop.
40-
func run(pollInterval: Duration, pollClosure: @escaping () -> Void, source: Producer.Source?) async {
41-
switch self.stateMachineLock.withLockedValue({ $0.run(source, pollClosure) }) {
40+
func run(pollInterval: Duration) async {
41+
switch self.stateMachineLock.withLockedValue({ $0.run() }) {
4242
case .alreadyClosed:
4343
return
4444
case .alreadyRunning:
@@ -84,7 +84,7 @@ final class KafkaPollingSystem<Element>: Sendable {
8484
func yield(_ element: Element) {
8585
self.stateMachineLock.withLockedValue { stateMachine in
8686
switch stateMachine.state {
87-
case .started(let source, _, _), .producing(let source, _, _), .stopProducing(let source, _, _, _):
87+
case .idle(let source, _, _), .producing(let source, _, _), .stopProducing(let source, _, _, _):
8888
// We can also yield when in .stopProducing,
8989
// the AsyncSequenceProducer will buffer for us
9090
let yieldResult = source?.yield(element)
@@ -105,7 +105,7 @@ final class KafkaPollingSystem<Element>: Sendable {
105105
case .none:
106106
break
107107
}
108-
case .idle, .finished:
108+
case .finished:
109109
return
110110
}
111111
}
@@ -149,9 +149,7 @@ extension KafkaPollingSystem {
149149
/// The possible states of the state machine.
150150
enum State {
151151
/// Initial state.
152-
case idle
153-
/// The ``run()`` method has been invoked and the ``KafkaPollingSystem`` is ready.
154-
case started(
152+
case idle(
155153
source: Producer.Source?,
156154
pollClosure: () -> Void,
157155
running: Bool
@@ -175,7 +173,42 @@ extension KafkaPollingSystem {
175173
}
176174

177175
/// The current state of the state machine.
178-
var state: State = .idle
176+
var state: State
177+
178+
/// Allows the producer to synchronously `yield` new elements to the ``NIOAsyncSequenceProducer``
179+
/// and to `finish` the sequence.
180+
var source: Producer.Source? {
181+
get {
182+
// Extracts source from state machine
183+
switch self.state {
184+
case .idle(let source, _, _):
185+
return source
186+
case .producing(let source, _, _):
187+
return source
188+
case .stopProducing(let source, _, _, _):
189+
return source
190+
case .finished:
191+
return nil
192+
}
193+
}
194+
set {
195+
// Add new source to current state
196+
switch self.state {
197+
case .idle(_, let pollClosure, let running):
198+
self.state = .idle(source: newValue, pollClosure: pollClosure, running: running)
199+
case .producing(_, let pollClosure, let running):
200+
self.state = .producing(source: newValue, pollClosure: pollClosure, running: running)
201+
case .stopProducing(_, let continuation, let pollClosure, let running):
202+
self.state = .stopProducing(source: newValue, continuation: continuation, pollClosure: pollClosure, running: running)
203+
case .finished:
204+
break
205+
}
206+
}
207+
}
208+
209+
init(pollClosure: @escaping () -> Void) {
210+
self.state = .idle(source: nil, pollClosure: pollClosure, running: false)
211+
}
179212

180213
/// Actions to take after ``run()`` has been invoked on the ``KafkaPollingSystem/StateMachine``.
181214
enum RunAction {
@@ -187,15 +220,13 @@ extension KafkaPollingSystem {
187220
case startLoop
188221
}
189222

190-
mutating func run(_ source: Producer.Source?, _ pollClosure: @escaping () -> Void) -> RunAction {
223+
mutating func run() -> RunAction {
191224
switch self.state {
192-
case .idle:
193-
self.state = .started(source: source, pollClosure: pollClosure, running: true)
194-
case .started(let source, let pollClosure, let running):
225+
case .idle(let source, let pollClosure, let running):
195226
guard running == false else {
196227
return .alreadyRunning
197228
}
198-
self.state = .started(source: source, pollClosure: pollClosure, running: true)
229+
self.state = .idle(source: source, pollClosure: pollClosure, running: true)
199230
case .producing(let source, let pollClosure, let running):
200231
guard running == false else {
201232
return .alreadyRunning
@@ -228,9 +259,7 @@ extension KafkaPollingSystem {
228259
/// - Returns: The next action for the poll loop.
229260
func nextPollLoopAction() -> PollLoopAction {
230261
switch self.state {
231-
case .idle:
232-
fatalError("State machine must be initialized with prepare()")
233-
case .started(_, let pollClosure, _), .producing(_, let pollClosure, _):
262+
case .idle(_, let pollClosure, _), .producing(_, let pollClosure, _):
234263
return .pollAndSleep(pollClosure: pollClosure)
235264
case .stopProducing:
236265
// We were asked to stop producing,
@@ -251,12 +280,12 @@ extension KafkaPollingSystem {
251280
/// Our downstream consumer allowed us to produce more elements.
252281
mutating func produceMore() -> ProduceMoreAction? {
253282
switch self.state {
254-
case .idle, .finished, .producing:
283+
case .finished, .producing:
255284
break
256285
case .stopProducing(let source, let continuation, let pollClosure, let running):
257286
self.state = .producing(source: source, pollClosure: pollClosure, running: running)
258287
return .resume(continuation)
259-
case .started(let source, let pollClosure, let running):
288+
case .idle(let source, let pollClosure, let running):
260289
self.state = .producing(source: source, pollClosure: pollClosure, running: running)
261290
}
262291
return nil
@@ -267,8 +296,6 @@ extension KafkaPollingSystem {
267296
switch self.state {
268297
case .idle, .finished, .stopProducing:
269298
break
270-
case .started:
271-
fatalError("\(#function) is not supported in state \(self.state)")
272299
case .producing(let source, let pollClosure, let running):
273300
self.state = .stopProducing(source: source, continuation: nil, pollClosure: pollClosure, running: running)
274301
}
@@ -280,11 +307,11 @@ extension KafkaPollingSystem {
280307
/// After resuming the continuation, our poll loop will start running again.
281308
mutating func suspendLoop(continuation: CheckedContinuation<Void, Never>) {
282309
switch self.state {
283-
case .idle, .finished:
310+
case .finished:
284311
return
285312
case .stopProducing(_, .some, _, _):
286313
fatalError("Internal state inconsistency. Run loop is running more than once")
287-
case .started(let source, let pollClosure, let running), .producing(let source, let pollClosure, let running), .stopProducing(let source, _, let pollClosure, let running):
314+
case .idle(let source, let pollClosure, let running), .producing(let source, let pollClosure, let running), .stopProducing(let source, _, let pollClosure, let running):
288315
self.state = .stopProducing(source: source, continuation: continuation, pollClosure: pollClosure, running: running)
289316
}
290317
}
@@ -304,9 +331,9 @@ extension KafkaPollingSystem {
304331
/// Terminate the state machine and finish producing elements.
305332
mutating func terminate() -> TerminateAction? {
306333
switch self.state {
307-
case .idle, .finished:
334+
case .finished:
308335
return nil
309-
case .started(let source, _, _), .producing(let source, _, _):
336+
case .idle(let source, _, _), .producing(let source, _, _):
310337
self.state = .finished
311338
return .finishSequenceSource(source: source)
312339
case .stopProducing(let source, let continuation, _, _):

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ public actor KafkaProducer {
7676
/// `AsyncSequence` that returns all ``KafkaProducerMessage`` objects that have been
7777
/// acknowledged by the Kafka cluster.
7878
public nonisolated let acknowledgements: AcknowledgedMessagesAsyncSequence
79-
private let acknowledgementsSource: AcknowledgedMessagesAsyncSequence.WrappedSequence.Source
8079

8180
/// A class that wraps a closure with a reference to that closure, allowing to change the underlying functionality
8281
/// of `funcTofunc` after it has been passed.
@@ -121,7 +120,12 @@ public actor KafkaProducer {
121120
logger: self.logger
122121
)
123122

124-
self.pollingSystem = KafkaPollingSystem()
123+
self.pollingSystem = KafkaPollingSystem(pollClosure: { [client] in
124+
client.withKafkaHandlePointer { handle in
125+
rd_kafka_poll(handle, 0)
126+
}
127+
return
128+
})
125129

126130
// (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
127131
// This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
@@ -137,7 +141,10 @@ public actor KafkaProducer {
137141
self.acknowledgements = AcknowledgedMessagesAsyncSequence(
138142
wrappedSequence: acknowledgementsSourceAndSequence.sequence
139143
)
140-
self.acknowledgementsSource = acknowledgementsSourceAndSequence.source
144+
145+
self.pollingSystem.stateMachineLock.withLockedValue { stateMachine in
146+
stateMachine.source = acknowledgementsSourceAndSequence.source
147+
}
141148

142149
callbackClosure.wrappedClosure = { [logger, pollingSystem] messageResult in
143150
guard let messageResult else {
@@ -186,16 +193,7 @@ public actor KafkaProducer {
186193
/// - Returns: An awaitable task representing the execution of the poll loop.
187194
public func run(pollInterval: Duration = .milliseconds(100)) async {
188195
// TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle)
189-
await self.pollingSystem.run(
190-
pollInterval: pollInterval,
191-
pollClosure: { [client] in
192-
client.withKafkaHandlePointer { handle in
193-
rd_kafka_poll(handle, 0)
194-
}
195-
return
196-
},
197-
source: self.acknowledgementsSource
198-
)
196+
await self.pollingSystem.run(pollInterval: pollInterval)
199197
}
200198

201199
/// Send messages to the Kafka cluster asynchronously, aka "fire and forget".

Tests/SwiftKafkaTests/KafkaPollingSystemTests.swift

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,12 @@ final class KafkaPollingSystemTests: XCTestCase {
4141
func testBackPressure() async throws {
4242
let pollInterval = Duration.milliseconds(100)
4343

44-
let sut = KafkaPollingSystem<Message>()
4544
let closureWrapper = ClosureWrapper()
45+
let sut = KafkaPollingSystem<Message>(pollClosure: {
46+
closureWrapper.funcTofunc()
47+
})
4648
let runTask = Task {
47-
await sut.run(
48-
pollInterval: pollInterval,
49-
pollClosure: { closureWrapper.funcTofunc() },
50-
source: nil
51-
)
49+
await sut.run(pollInterval: pollInterval)
5250
}
5351

5452
let expectation = XCTestExpectation(description: "Poll closure invoked after initial produceMore()")
@@ -95,14 +93,12 @@ final class KafkaPollingSystemTests: XCTestCase {
9593
func testNoPollsAfterPollLoopSuspension() async throws {
9694
let pollInterval = Duration.milliseconds(100)
9795

98-
let sut = KafkaPollingSystem<Message>()
9996
let closureWrapper = ClosureWrapper()
97+
let sut = KafkaPollingSystem<Message>(pollClosure: {
98+
closureWrapper.funcTofunc()
99+
})
100100
let runTask = Task {
101-
await sut.run(
102-
pollInterval: pollInterval,
103-
pollClosure: { closureWrapper.funcTofunc() },
104-
source: nil
105-
)
101+
await sut.run(pollInterval: pollInterval)
106102
}
107103

108104
let expectation = XCTestExpectation(description: "Poll closure invoked after initial produceMore()")
@@ -137,14 +133,12 @@ final class KafkaPollingSystemTests: XCTestCase {
137133
func testRunTaskCancellationShutsDownStateMachine() async throws {
138134
let pollInterval = Duration.milliseconds(100)
139135

140-
let sut = KafkaPollingSystem<Message>()
141136
let closureWrapper = ClosureWrapper()
137+
let sut = KafkaPollingSystem<Message>(pollClosure: {
138+
closureWrapper.funcTofunc()
139+
})
142140
let runTask = Task {
143-
await sut.run(
144-
pollInterval: pollInterval,
145-
pollClosure: { closureWrapper.funcTofunc() },
146-
source: nil
147-
)
141+
await sut.run(pollInterval: pollInterval)
148142
}
149143

150144
let expectation = XCTestExpectation(description: "Poll closure invoked after initial produceMore()")

0 commit comments

Comments
 (0)