Skip to content

Commit 9c34dab

Browse files
committed
Revert "Revert "Create KafkaProducer with factory method""
This reverts commit 95c8dda.
1 parent cc0e14c commit 9c34dab

File tree

4 files changed

+67
-27
lines changed

4 files changed

+67
-27
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can late
1111
```swift
1212
let config = KafkaProducerConfig(bootstrapServers: ["localhost:9092"])
1313

14-
let producer = try await KafkaProducer(
14+
let (producer, acknowledgements) = try await KafkaProducer.newProducer(
1515
config: config,
1616
logger: .kafkaTest // Your logger here
1717
)
@@ -32,7 +32,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
3232
)
3333
)
3434

35-
for await acknowledgement in producer.acknowledgements {
35+
for await acknowledgement in acknowledgements {
3636
// Check if acknowledgement belongs to the sent message
3737
}
3838

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,15 @@ public actor KafkaProducer {
7474
/// Used for handling the connection to the Kafka cluster.
7575
private let client: KafkaClient
7676

77-
/// `AsyncSequence` that returns all ``KafkaProducerMessage`` objects that have been
78-
/// acknowledged by the Kafka cluster.
79-
public nonisolated let acknowledgements: KafkaMessageAcknowledgements
80-
77+
// Private. `KafkaProducer.newProducer` should be used to create a new producer.
8178
/// Initialize a new ``KafkaProducer``.
8279
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
8380
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
8481
/// - Parameter logger: A logger.
85-
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
86-
public init(
87-
config: KafkaProducerConfig = KafkaProducerConfig(),
88-
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
82+
/// - Throws: A ``KafkaError`` if initializing the producer failed.
83+
private init(
84+
config: KafkaProducerConfig,
85+
topicConfig: KafkaTopicConfig,
8986
logger: Logger
9087
) async throws {
9188
self.topicConfig = topicConfig
@@ -108,14 +105,35 @@ public actor KafkaProducer {
108105
},
109106
logger: self.logger
110107
)
108+
}
109+
110+
/// Initialize a new ``KafkaProducer`` alongside a ``KafkaAsyncSequence`` that can be used
111+
/// to receive message acknowlegements.
112+
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
113+
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
114+
/// - Parameter logger: A logger.
115+
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaAsyncSequence``
116+
/// used for receiving message acknowledgements.
117+
/// - Throws: A ``KafkaError`` if initializing the producer failed.
118+
public static func newProducer(
119+
config: KafkaProducerConfig = KafkaProducerConfig(),
120+
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
121+
logger: Logger
122+
) async throws -> (KafkaProducer, KafkaAsyncSequence<Acknowledgement>) {
123+
let producer = try await KafkaProducer(
124+
config: config,
125+
topicConfig: topicConfig,
126+
logger: logger
127+
)
111128

112129
// TODO(felix): this should be injected through config
113130
let backPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
114131
lowWatermark: 10,
115132
highWatermark: 50
116133
)
117134

118-
let sequence = self.pollingSystem.initialize(
135+
let client = producer.client
136+
let sequence = producer.pollingSystem.initialize(
119137
backPressureStrategy: backPressureStrategy,
120138
pollClosure: { [client] in
121139
client.withKafkaHandlePointer { handle in
@@ -124,7 +142,8 @@ public actor KafkaProducer {
124142
return
125143
}
126144
)
127-
self.acknowledgements = KafkaMessageAcknowledgements(wrappedSequence: sequence)
145+
146+
return (producer, sequence)
128147
}
129148

130149
/// Method to shutdown the ``KafkaProducer``.

Tests/IntegrationTests/SwiftKafkaTests.swift

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ final class SwiftKafkaTests: XCTestCase {
7070

7171
func testProduceAndConsumeWithConsumerGroup() async throws {
7272
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
73-
let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest)
73+
let (producer, acks) = try await KafkaProducer.newProducer(config: self.producerConfig, logger: .kafkaTest)
7474

7575
await withThrowingTaskGroup(of: Void.self) { group in
7676
// Run Task
@@ -80,7 +80,11 @@ final class SwiftKafkaTests: XCTestCase {
8080

8181
// Producer Task
8282
group.addTask {
83-
try await Self.sendAndAcknowledgeMessages(producer: producer, messages: testMessages)
83+
try await Self.sendAndAcknowledgeMessages(
84+
producer: producer,
85+
acknowledgements: acks,
86+
messages: testMessages
87+
)
8488
await producer.shutdownGracefully()
8589
}
8690

@@ -118,7 +122,7 @@ final class SwiftKafkaTests: XCTestCase {
118122

119123
func testProduceAndConsumeWithAssignedTopicPartition() async throws {
120124
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
121-
let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest)
125+
let (producer, acks) = try await KafkaProducer.newProducer(config: self.producerConfig, logger: .kafkaTest)
122126

123127
await withThrowingTaskGroup(of: Void.self) { group in
124128
// Run Task
@@ -128,7 +132,11 @@ final class SwiftKafkaTests: XCTestCase {
128132

129133
// Producer Task
130134
group.addTask {
131-
try await Self.sendAndAcknowledgeMessages(producer: producer, messages: testMessages)
135+
try await Self.sendAndAcknowledgeMessages(
136+
producer: producer,
137+
acknowledgements: acks,
138+
messages: testMessages
139+
)
132140
await producer.shutdownGracefully()
133141
}
134142

@@ -167,7 +175,7 @@ final class SwiftKafkaTests: XCTestCase {
167175

168176
func testProduceAndConsumeWithCommitSync() async throws {
169177
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
170-
let producer = try await KafkaProducer(config: producerConfig, logger: .kafkaTest)
178+
let (producer, acks) = try await KafkaProducer.newProducer(config: self.producerConfig, logger: .kafkaTest)
171179

172180
await withThrowingTaskGroup(of: Void.self) { group in
173181
// Run Task
@@ -177,7 +185,11 @@ final class SwiftKafkaTests: XCTestCase {
177185

178186
// Producer Task
179187
group.addTask {
180-
try await Self.sendAndAcknowledgeMessages(producer: producer, messages: testMessages)
188+
try await Self.sendAndAcknowledgeMessages(
189+
producer: producer,
190+
acknowledgements: acks,
191+
messages: testMessages
192+
)
181193
await producer.shutdownGracefully()
182194
}
183195

@@ -233,6 +245,7 @@ final class SwiftKafkaTests: XCTestCase {
233245

234246
private static func sendAndAcknowledgeMessages(
235247
producer: KafkaProducer,
248+
acknowledgements: KafkaAsyncSequence<KafkaProducer.Acknowledgement>,
236249
messages: [KafkaProducerMessage]
237250
) async throws {
238251
var messageIDs = Set<UInt>()
@@ -243,7 +256,7 @@ final class SwiftKafkaTests: XCTestCase {
243256

244257
var acknowledgedMessages = Set<KafkaAcknowledgedMessage>()
245258

246-
for await messageResult in producer.acknowledgements {
259+
for await messageResult in acknowledgements {
247260
guard case .success(let acknowledgedMessage) = messageResult else {
248261
XCTFail()
249262
return

Tests/SwiftKafkaTests/KafkaProducerTests.swift

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ final class KafkaProducerTests: XCTestCase {
5252
}
5353

5454
func testSendAsync() async throws {
55-
let producer = try await KafkaProducer(config: config, logger: .kafkaTest)
55+
let (producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
5656

5757
await withThrowingTaskGroup(of: Void.self) { group in
5858

@@ -72,7 +72,7 @@ final class KafkaProducerTests: XCTestCase {
7272

7373
let messageID = try await producer.sendAsync(message)
7474

75-
for await messageResult in producer.acknowledgements {
75+
for await messageResult in acks {
7676
guard case .success(let acknowledgedMessage) = messageResult else {
7777
XCTFail()
7878
return
@@ -91,7 +91,7 @@ final class KafkaProducerTests: XCTestCase {
9191
}
9292

9393
func testSendAsyncEmptyMessage() async throws {
94-
let producer = try await KafkaProducer(config: config, logger: .kafkaTest)
94+
let (producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
9595

9696
await withThrowingTaskGroup(of: Void.self) { group in
9797

@@ -110,7 +110,7 @@ final class KafkaProducerTests: XCTestCase {
110110

111111
let messageID = try await producer.sendAsync(message)
112112

113-
for await messageResult in producer.acknowledgements {
113+
for await messageResult in acks {
114114
guard case .success(let acknowledgedMessage) = messageResult else {
115115
XCTFail()
116116
return
@@ -129,7 +129,7 @@ final class KafkaProducerTests: XCTestCase {
129129
}
130130

131131
func testSendAsyncTwoTopics() async throws {
132-
let producer = try await KafkaProducer(config: config, logger: .kafkaTest)
132+
let (producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
133133
await withThrowingTaskGroup(of: Void.self) { group in
134134

135135
// Run Task
@@ -157,7 +157,7 @@ final class KafkaProducerTests: XCTestCase {
157157

158158
var acknowledgedMessages = Set<KafkaAcknowledgedMessage>()
159159

160-
for await messageResult in producer.acknowledgements {
160+
for await messageResult in acks {
161161
guard case .success(let acknowledgedMessage) = messageResult else {
162162
XCTFail()
163163
return
@@ -185,7 +185,7 @@ final class KafkaProducerTests: XCTestCase {
185185
}
186186

187187
func testProducerNotUsableAfterShutdown() async throws {
188-
let producer = try await KafkaProducer(config: config, logger: .kafkaTest)
188+
let (producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
189189
await producer.shutdownGracefully()
190190

191191
await withThrowingTaskGroup(of: Void.self) { group in
@@ -206,13 +206,21 @@ final class KafkaProducerTests: XCTestCase {
206206
try await producer.sendAsync(message)
207207
XCTFail("Method should have thrown error")
208208
} catch {}
209+
210+
// This subscribes to the acknowledgements stream and immediately terminates the stream.
211+
// Required to kill the run task.
212+
var iterator: KafkaAsyncSequence<KafkaProducer.Acknowledgement>.AsyncIterator? = acks.makeAsyncIterator()
213+
_ = iterator
214+
iterator = nil
209215
}
210216
}
211217
}
212218

213219
func testNoMemoryLeakAfterShutdown() async throws {
214220
var producer: KafkaProducer?
215-
producer = try await KafkaProducer(config: self.config, logger: .kafkaTest)
221+
var acks: KafkaAsyncSequence<KafkaProducer.Acknowledgement>
222+
(producer, acks) = try await KafkaProducer.newProducer(config: self.config, logger: .kafkaTest)
223+
_ = acks
216224

217225
weak var producerCopy = producer
218226

0 commit comments

Comments
 (0)