Skip to content

Commit ff2cb71

Browse files
committed
Have two factory methods creating KafkaProducer
Motivation: We want to have a `KafkaProducer` that is not consuming any acknowledgements. This means it is initialized without a `deliveryReportCallback` which in turn means that `librdkafka` will not queue any incoming acknowledgements which prevents us from running out of memory in that case. Modifications: * add two new factory methods for creating `KafkaProducer`: * `KafkaProducer.newProducer` * `KafkaProducer.newProducerWithAcknowledgements` * update README
1 parent 9c34dab commit ff2cb71

File tree

5 files changed

+77
-41
lines changed

5 files changed

+77
-41
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can late
1111
```swift
1212
let config = KafkaProducerConfig(bootstrapServers: ["localhost:9092"])
1313

14-
let (producer, acknowledgements) = try await KafkaProducer.newProducer(
14+
let (producer, acknowledgements) = try await KafkaProducer.newProducerWithAcknowledgements(
1515
config: config,
1616
logger: .kafkaTest // Your logger here
1717
)

Sources/SwiftKafka/KafkaPollingSystem.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ final class KafkaPollingSystem<Element>: Sendable {
3333
self.stateMachine = NIOLockedValueBox(StateMachine())
3434
}
3535

36-
/// Initialize the ``KafkaPollingSystem`` and create the ``KafkaAsyncSequence`` that publishes
36+
/// Initialize the ``KafkaPollingSystem`` and create the ``NIOAsyncSequenceProducer`` that publishes
3737
/// message acknowledgements.
3838
///
3939
/// We use this second `initialize()` method to support delayed initialization,
4040
/// which is needed because the initialization ``NIOAsyncSequenceProducer`` requires a reference
4141
/// to an existing ``KafkaPollingSystem`` object but our ``StateMachine`` in turn needs a reference to
4242
/// the ``NIOAsyncSequenceProducer.Source`` object.
4343
///
44-
/// - Returns: The newly created ``KafkaAsyncSequence`` object.
44+
/// - Returns: The newly created ``NIOAsyncSequenceProducer`` object.
4545
func initialize(
4646
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
4747
pollClosure: @escaping () -> Void

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -70,58 +70,91 @@ public actor KafkaProducer {
7070
/// Mechanism that polls the Kafka cluster for updates periodically.
7171
private let pollingSystem: KafkaPollingSystem<
7272
Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
73-
>
73+
>?
7474
/// Used for handling the connection to the Kafka cluster.
7575
private let client: KafkaClient
7676

77-
// Private. `KafkaProducer.newProducer` should be used to create a new producer.
77+
// Private initializer, use factory methods to create KafkaProducer
7878
/// Initialize a new ``KafkaProducer``.
7979
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
8080
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
8181
/// - Parameter logger: A logger.
8282
/// - Throws: A ``KafkaError`` if initializing the producer failed.
8383
private init(
84-
config: KafkaProducerConfig,
84+
client: KafkaClient,
85+
pollingSystem: KafkaPollingSystem<Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>>? = nil,
8586
topicConfig: KafkaTopicConfig,
8687
logger: Logger
8788
) async throws {
89+
self.client = client
90+
self.pollingSystem = pollingSystem
8891
self.topicConfig = topicConfig
89-
self.logger = logger
9092
self.topicHandles = [:]
93+
self.logger = logger
9194
self.state = .started
95+
}
9296

93-
self.pollingSystem = KafkaPollingSystem<Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>>()
94-
95-
self.client = try RDKafka.createClient(
97+
/// Initialize a new ``KafkaProducer`` that ignores incoming message acknowledgements.
98+
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
99+
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
100+
/// - Parameter logger: A logger.
101+
/// - Returns: The newly created ``KafkaProducer``.
102+
/// - Throws: A ``KafkaError`` if initializing the producer failed.
103+
public static func newProducer(
104+
config: KafkaProducerConfig = KafkaProducerConfig(),
105+
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
106+
logger: Logger
107+
) async throws -> KafkaProducer {
108+
let client = try RDKafka.createClient(
96109
type: .producer,
97110
configDictionary: config.dictionary,
98-
callback: { [logger, pollingSystem] messageResult in
99-
guard let messageResult else {
100-
logger.error("Could not resolve acknowledged message")
101-
return
102-
}
111+
// Having no callback will discard any incoming acknowlegement messages
112+
// Ref: rdkafka_broker.c:rd_kafka_dr_msgq
113+
callback: nil,
114+
logger: logger
115+
)
103116

104-
pollingSystem.yield(messageResult)
105-
},
106-
logger: self.logger
117+
let producer = try await KafkaProducer(
118+
client: client,
119+
pollingSystem: nil, // we don't receive acknowlegements so no pollingSystem needed
120+
topicConfig: topicConfig,
121+
logger: logger
107122
)
123+
124+
return producer
108125
}
109126

110-
/// Initialize a new ``KafkaProducer`` alongside a ``KafkaAsyncSequence`` that can be used
127+
/// Initialize a new ``KafkaProducer`` alongside a ``KafkaMessageAcknowledgements`` `AsyncSequence` that can be used
111128
/// to receive message acknowlegements.
112129
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
113130
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
114131
/// - Parameter logger: A logger.
115-
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaAsyncSequence``
116-
/// used for receiving message acknowledgements.
132+
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
133+
/// `AsyncSequence` used for receiving message acknowledgements.
117134
/// - Throws: A ``KafkaError`` if initializing the producer failed.
118-
public static func newProducer(
135+
public static func newProducerWithAcknowledgements(
119136
config: KafkaProducerConfig = KafkaProducerConfig(),
120137
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
121138
logger: Logger
122-
) async throws -> (KafkaProducer, KafkaAsyncSequence<Acknowledgement>) {
139+
) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) {
140+
let pollingSystem = KafkaPollingSystem<Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>>()
141+
let client = try RDKafka.createClient(
142+
type: .producer,
143+
configDictionary: config.dictionary,
144+
callback: { [logger, pollingSystem] messageResult in
145+
guard let messageResult else {
146+
logger.error("Could not resolve acknowledged message")
147+
return
148+
}
149+
150+
pollingSystem.yield(messageResult)
151+
},
152+
logger: logger
153+
)
154+
123155
let producer = try await KafkaProducer(
124-
config: config,
156+
client: client,
157+
pollingSystem: pollingSystem,
125158
topicConfig: topicConfig,
126159
logger: logger
127160
)
@@ -132,8 +165,7 @@ public actor KafkaProducer {
132165
highWatermark: 50
133166
)
134167

135-
let client = producer.client
136-
let sequence = producer.pollingSystem.initialize(
168+
let _sequence = pollingSystem.initialize(
137169
backPressureStrategy: backPressureStrategy,
138170
pollClosure: { [client] in
139171
client.withKafkaHandlePointer { handle in
@@ -142,8 +174,9 @@ public actor KafkaProducer {
142174
return
143175
}
144176
)
177+
let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: _sequence)
145178

146-
return (producer, sequence)
179+
return (producer, acknowlegementsSequence)
147180
}
148181

149182
/// Method to shutdown the ``KafkaProducer``.
@@ -163,15 +196,15 @@ public actor KafkaProducer {
163196

164197
private func _shutDownGracefully(timeout: Int32) async {
165198
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
166-
// Wait 10 seconds for outstanding messages to be sent and callbacks to be called
199+
// Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called
167200
self.client.withKafkaHandlePointer { handle in
168201
rd_kafka_flush(handle, timeout)
169202
continuation.resume()
170203
}
171204
}
172205

173206
// Kill poll loop in polling system
174-
self.pollingSystem.terminate()
207+
self.pollingSystem?.terminate()
175208

176209
for (_, topicHandle) in self.topicHandles {
177210
rd_kafka_topic_destroy(topicHandle)
@@ -186,7 +219,10 @@ public actor KafkaProducer {
186219
/// - Returns: An awaitable task representing the execution of the poll loop.
187220
public func run(pollInterval: Duration = .milliseconds(100)) async throws {
188221
// TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle)
189-
try await self.pollingSystem.run(pollInterval: pollInterval)
222+
guard let pollingSystem else {
223+
fatalError("Method \(#function) should only be used with the acknowledgement receiving producer.")
224+
}
225+
try await pollingSystem.run(pollInterval: pollInterval)
190226
}
191227

192228
/// Send messages to the Kafka cluster asynchronously, aka "fire and forget".

Tests/IntegrationTests/SwiftKafkaTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ final class SwiftKafkaTests: XCTestCase {
7070

7171
func testProduceAndConsumeWithConsumerGroup() async throws {
7272
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
73-
let (producer, acks) = try await KafkaProducer.newProducer(config: self.producerConfig, logger: .kafkaTest)
73+
let (producer, acks) = try await KafkaProducer.newProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest)
7474

7575
await withThrowingTaskGroup(of: Void.self) { group in
7676
// Run Task
@@ -122,7 +122,7 @@ final class SwiftKafkaTests: XCTestCase {
122122

123123
func testProduceAndConsumeWithAssignedTopicPartition() async throws {
124124
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
125-
let (producer, acks) = try await KafkaProducer.newProducer(config: self.producerConfig, logger: .kafkaTest)
125+
let (producer, acks) = try await KafkaProducer.newProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest)
126126

127127
await withThrowingTaskGroup(of: Void.self) { group in
128128
// Run Task
@@ -175,7 +175,7 @@ final class SwiftKafkaTests: XCTestCase {
175175

176176
func testProduceAndConsumeWithCommitSync() async throws {
177177
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
178-
let (producer, acks) = try await KafkaProducer.newProducer(config: self.producerConfig, logger: .kafkaTest)
178+
let (producer, acks) = try await KafkaProducer.newProducerWithAcknowledgements(config: self.producerConfig, logger: .kafkaTest)
179179

180180
await withThrowingTaskGroup(of: Void.self) { group in
181181
// Run Task
@@ -245,7 +245,7 @@ final class SwiftKafkaTests: XCTestCase {
245245

246246
private static func sendAndAcknowledgeMessages(
247247
producer: KafkaProducer,
248-
acknowledgements: KafkaAsyncSequence<KafkaProducer.Acknowledgement>,
248+
acknowledgements: KafkaMessageAcknowledgements,
249249
messages: [KafkaProducerMessage]
250250
) async throws {
251251
var messageIDs = Set<UInt>()

Tests/SwiftKafkaTests/KafkaProducerTests.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ final class KafkaProducerTests: XCTestCase {
5252
}
5353

5454
func testSendAsync() async throws {
55-
let (producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
55+
let (producer, acks) = try await KafkaProducer.newProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
5656

5757
await withThrowingTaskGroup(of: Void.self) { group in
5858

@@ -91,7 +91,7 @@ final class KafkaProducerTests: XCTestCase {
9191
}
9292

9393
func testSendAsyncEmptyMessage() async throws {
94-
let (producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
94+
let (producer, acks) = try await KafkaProducer.newProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
9595

9696
await withThrowingTaskGroup(of: Void.self) { group in
9797

@@ -129,7 +129,7 @@ final class KafkaProducerTests: XCTestCase {
129129
}
130130

131131
func testSendAsyncTwoTopics() async throws {
132-
let (producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
132+
let (producer, acks) = try await KafkaProducer.newProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
133133
await withThrowingTaskGroup(of: Void.self) { group in
134134

135135
// Run Task
@@ -185,7 +185,7 @@ final class KafkaProducerTests: XCTestCase {
185185
}
186186

187187
func testProducerNotUsableAfterShutdown() async throws {
188-
let (producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
188+
let (producer, acks) = try await KafkaProducer.newProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
189189
await producer.shutdownGracefully()
190190

191191
await withThrowingTaskGroup(of: Void.self) { group in
@@ -209,7 +209,7 @@ final class KafkaProducerTests: XCTestCase {
209209

210210
// This subscribes to the acknowledgements stream and immediately terminates the stream.
211211
// Required to kill the run task.
212-
var iterator: KafkaAsyncSequence<KafkaProducer.Acknowledgement>.AsyncIterator? = acks.makeAsyncIterator()
212+
var iterator: KafkaMessageAcknowledgements.AsyncIterator? = acks.makeAsyncIterator()
213213
_ = iterator
214214
iterator = nil
215215
}
@@ -218,8 +218,8 @@ final class KafkaProducerTests: XCTestCase {
218218

219219
func testNoMemoryLeakAfterShutdown() async throws {
220220
var producer: KafkaProducer?
221-
var acks: KafkaAsyncSequence<KafkaProducer.Acknowledgement>
222-
(producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
221+
var acks: KafkaMessageAcknowledgements
222+
(producer, acks) = try await KafkaProducer.newProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
223223
_ = acks
224224

225225
weak var producerCopy = producer

0 commit comments

Comments
 (0)