Skip to content

Commit 298067a

Browse files
Wrap rd_kafka_consumer_poll into iterator (use librdkafka embedded backpressure) (#158)
* remove message sequence * test consumer with implicit rebalance * misc + format * remove artefact * don't check a lot of messages * fix typo * slow down first consumer to lower message to fit CI timeout * remove helpers * use exact benchmark version to avoid missing thresholds error (as no thresholds so far) * add deprecated marks for backpressure, change comment for future dev * address comments
1 parent 8be7e2f commit 298067a

File tree

4 files changed

+213
-391
lines changed

4 files changed

+213
-391
lines changed

Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift

-32
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
import Crdkafka
1615
import struct Foundation.UUID
1716

1817
public struct KafkaConsumerConfiguration {
@@ -23,37 +22,6 @@ public struct KafkaConsumerConfiguration {
2322
/// Default: `.milliseconds(100)`
2423
public var pollInterval: Duration = .milliseconds(100)
2524

26-
/// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``.
27-
public struct BackPressureStrategy: Sendable, Hashable {
28-
enum _BackPressureStrategy: Sendable, Hashable {
29-
case watermark(low: Int, high: Int)
30-
}
31-
32-
let _internal: _BackPressureStrategy
33-
34-
private init(backPressureStrategy: _BackPressureStrategy) {
35-
self._internal = backPressureStrategy
36-
}
37-
38-
/// A back pressure strategy based on high and low watermarks.
39-
///
40-
/// The consumer maintains a buffer size between a low watermark and a high watermark
41-
/// to control the flow of incoming messages.
42-
///
43-
/// - Parameter low: The lower threshold for the buffer size (low watermark).
44-
/// - Parameter high: The upper threshold for the buffer size (high watermark).
45-
public static func watermark(low: Int, high: Int) -> BackPressureStrategy {
46-
return .init(backPressureStrategy: .watermark(low: low, high: high))
47-
}
48-
}
49-
50-
/// The backpressure strategy to be used for message consumption.
51-
/// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information.
52-
public var backPressureStrategy: BackPressureStrategy = .watermark(
53-
low: 10,
54-
high: 50
55-
)
56-
5725
/// A struct representing the different Kafka message consumption strategies.
5826
public struct ConsumptionStrategy: Sendable, Hashable {
5927
enum _ConsumptionStrategy: Sendable, Hashable {

0 commit comments

Comments
 (0)