Skip to content

Commit 5b07fe2

Browse files
KafkaProducer: refactor flushing (#94)
* `KafkaProducer`: refactor flushing Motivation: With our own flushing implementation we were flushing until `rd_kakfa_outq_len` reached ``0`. However `rd_kakfa_outq_len` also takes other events such as statistics into account which led to a race where we were flushing for a very long time because more and more other events were produced to the queue while we were flushing. Modifications: * `KafkaClient`: * remove `var outgoingQueueSize` * add new method `flush(timeoutMilliseconds:)` that executes the blocking `rd_kafka_flush` call on a `DispatchQueue` but vends this as an `async func` * `KafkaProducer`: * rename `KafkaProducer.StateMachine.State.flushing` to `KafkaProducer.StateMachine.State.finishing` * invoke `KafkaClient.flush` before terminating poll loop * `KafkaProducerConfiguration`: add flush timeout * Make `KafkaProducer.flushTimeoutMilliseconds` `Int` * Review Franz Modifications: * add messages to `preconditions` * remove extra line in `RDKafkaClient.flush`'s parameter documentation
1 parent 51b5e64 commit 5b07fe2

File tree

3 files changed

+53
-26
lines changed

3 files changed

+53
-26
lines changed

Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@ public struct KafkaProducerConfiguration {
2020
/// Default: `.milliseconds(100)`
2121
public var pollInterval: Duration = .milliseconds(100)
2222

23+
/// Maximum timeout for flushing outstanding produce requests when the ``KakfaProducer`` is shutting down.
24+
/// Default: `10000`
25+
public var flushTimeoutMilliseconds: Int = 10000 {
26+
didSet {
27+
precondition(
28+
0...Int(Int32.max) ~= self.flushTimeoutMilliseconds,
29+
"Flush timeout outside of valid range \(0...Int32.max)"
30+
)
31+
}
32+
}
33+
2334
// MARK: - Producer-specific Config Properties
2435

2536
/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,12 @@ public final class KafkaProducer: Service, Sendable {
222222
}
223223
}
224224
try await Task.sleep(for: self.config.pollInterval)
225-
case .terminatePollLoopAndFinishSource(let source):
225+
case .flushFinishSourceAndTerminatePollLoop(let client, let source):
226+
precondition(
227+
0...Int(Int32.max) ~= self.config.flushTimeoutMilliseconds,
228+
"Flush timeout outside of valid range \(0...Int32.max)"
229+
)
230+
try await client.flush(timeoutMilliseconds: Int32(self.config.flushTimeoutMilliseconds))
226231
source?.finish()
227232
return
228233
case .terminatePollLoop:
@@ -297,7 +302,7 @@ extension KafkaProducer {
297302
///
298303
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
299304
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
300-
case flushing(
305+
case finishing(
301306
client: RDKafkaClient,
302307
source: Producer.Source?
303308
)
@@ -336,10 +341,12 @@ extension KafkaProducer {
336341
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
337342
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
338343
case pollAndYield(client: RDKafkaClient, source: Producer.Source?)
339-
/// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
344+
/// Flush any outstanding producer messages.
345+
/// Then terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
340346
///
347+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
341348
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
342-
case terminatePollLoopAndFinishSource(source: Producer.Source?)
349+
case flushFinishSourceAndTerminatePollLoop(client: RDKafkaClient, source: Producer.Source?)
343350
/// Terminate the poll loop.
344351
case terminatePollLoop
345352
}
@@ -356,13 +363,8 @@ extension KafkaProducer {
356363
return .pollAndYield(client: client, source: source)
357364
case .consumptionStopped(let client):
358365
return .pollWithoutYield(client: client)
359-
case .flushing(let client, let source):
360-
if client.outgoingQueueSize > 0 {
361-
return .pollAndYield(client: client, source: source)
362-
} else {
363-
self.state = .finished
364-
return .terminatePollLoopAndFinishSource(source: source)
365-
}
366+
case .finishing(let client, let source):
367+
return .flushFinishSourceAndTerminatePollLoop(client: client, source: source)
366368
case .finished:
367369
return .terminatePollLoop
368370
}
@@ -402,8 +404,8 @@ extension KafkaProducer {
402404
)
403405
case .consumptionStopped:
404406
throw KafkaError.connectionClosed(reason: "Sequence consuming acknowledgements was abruptly terminated, producer closed")
405-
case .flushing:
406-
throw KafkaError.connectionClosed(reason: "Producer in the process of flushing and shutting down")
407+
case .finishing:
408+
throw KafkaError.connectionClosed(reason: "Producer in the process of finishing")
407409
case .finished:
408410
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer")
409411
}
@@ -428,9 +430,9 @@ extension KafkaProducer {
428430
case .started(let client, _, let source, _):
429431
self.state = .consumptionStopped(client: client)
430432
return .finishSource(source: source)
431-
case .flushing(let client, let source):
433+
case .finishing(let client, let source):
432434
// Setting source to nil to prevent incoming acknowledgements from buffering in `source`
433-
self.state = .flushing(client: client, source: nil)
435+
self.state = .finishing(client: client, source: nil)
434436
return .finishSource(source: source)
435437
case .finished:
436438
break
@@ -446,10 +448,10 @@ extension KafkaProducer {
446448
case .uninitialized:
447449
fatalError("\(#function) invoked while still in state \(self.state)")
448450
case .started(let client, _, let source, _):
449-
self.state = .flushing(client: client, source: source)
451+
self.state = .finishing(client: client, source: source)
450452
case .consumptionStopped(let client):
451-
self.state = .flushing(client: client, source: nil)
452-
case .flushing, .finished:
453+
self.state = .finishing(client: client, source: nil)
454+
case .finishing, .finished:
453455
break
454456
}
455457
}

Sources/SwiftKafka/RDKafka/RDKafkaClient.swift

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Crdkafka
16+
import Dispatch
1617
import Logging
1718

1819
/// Base class for ``KafkaProducer`` and ``KafkaConsumer``,
@@ -389,6 +390,27 @@ final class RDKafkaClient: Sendable {
389390
}
390391
}
391392

393+
/// Flush any outstanding produce requests.
394+
///
395+
/// Parameters:
396+
/// - timeoutMilliseconds: Maximum time to wait for outstanding messages to be flushed.
397+
func flush(timeoutMilliseconds: Int32) async throws {
398+
// rd_kafka_flush is blocking and there is no convenient way to make it non-blocking.
399+
// We therefore execute rd_kafka_flush on a DispatchQueue to ensure it gets executed
400+
// on a separate thread that is not part of Swift Concurrency's cooperative thread pool.
401+
let queue = DispatchQueue(label: "com.swift-server.swift-kafka.flush")
402+
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
403+
queue.async {
404+
let error = rd_kafka_flush(self.kafkaHandle, timeoutMilliseconds)
405+
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
406+
continuation.resume(throwing: KafkaError.rdKafkaError(wrapping: error))
407+
} else {
408+
continuation.resume()
409+
}
410+
}
411+
}
412+
}
413+
392414
/// Close the consumer asynchronously. This means revoking its assignemnt, committing offsets to broker and
393415
/// leaving the consumer group (if applicable).
394416
///
@@ -407,14 +429,6 @@ final class RDKafkaClient: Sendable {
407429
rd_kafka_consumer_closed(self.kafkaHandle) == 1
408430
}
409431

410-
/// Returns the current out queue length.
411-
///
412-
/// This means the number of producer messages that wait to be sent + the number of any
413-
/// callbacks that are waiting to be executed by invoking `rd_kafka_poll`.
414-
var outgoingQueueSize: Int32 {
415-
return rd_kafka_outq_len(self.kafkaHandle)
416-
}
417-
418432
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
419433
/// - Warning: Do not escape the pointer from the closure for later use.
420434
/// - Parameter body: The closure will use the Kafka handle pointer.

0 commit comments

Comments
 (0)