Skip to content

Commit f8cb0a0

Browse files
KafkaProducer: events- instead of acks sequence (#96)
* `KafkaProducer`: event instead of acks sequence Motivation: `KafkaProducer`: we want to expose a general `KafkaProducerEvent` type in the `AsyncSequence` that was formerly just for message acknowledgements. This enables us to add more events such as statistics in the future. The reason behind having a single `AsyncSequence` for all event types instead of having separate `AsyncSequence`s for each event type is that we need to ensure that all events that we subscribe to in `librdkafka` are actually consumed by the `AsyncSequence` in our `KafkaProducer` type. Otherwise we could run out of memory. Now by giving the user the entire events `AsyncSequence`, they decide if they want to consume event or drop it. > **Note**: Logs will be consumed regardless of the > `KafkaProducerEvents` `AsyncSequence` Modifications: * create a new `enum` `KafkaProducerEvent` * rename `KafkaMessageAcknowledgements` -> `KafkaProducerEvents` * rename `KafkaProducer.makeProducerWithAcknowledgements` -> `.makeProducerWithEvents` * update tests * update README * Review Franz Modifications: * fix documentation typos * `KafkaProducerEvent`: replace factory method with `init` * create new type `KafkaProducerMessageStatus` effectively representing if a `KafkaProducerMessage` was acknowledged or not * fix README nit * `KafkaDeliveryReport` type Modifications: * create new type `KafkaDeliveryReport` containing a message's status and its id * remove ID from `KafkaAcknowledgedMessage` * remove `KafkaAcknowledgedMessageError` * rename `KafkaProducerEvent.deliverReport` -> `.deliverReports` * update README * update tests * Fix documentation nits * Review Gus Modifications: * fix typos * rename `KafkaProducerEvents.KafkaProducerEventAsyncIterator` -> `KafkaProducerEvents.AsyncIterator` * Fix typo * Fix (letter) case nit Co-authored-by: Franz Busch <[email protected]> --------- Co-authored-by: Franz Busch <[email protected]>
1 parent 5b07fe2 commit f8cb0a0

11 files changed

+239
-217
lines changed

README.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ Both the `KafkaProducer` and the `KafkaConsumer` implement the [`Service`](https
2929

3030
### Producer API
3131

32-
The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
32+
The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `events` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
3333

3434
```swift
3535
let broker = KafkaConfiguration.Broker(host: "localhost", port: 9092)
3636
var config = KafkaProducerConfiguration()
3737
config.bootstrapServers = [broker]
3838

39-
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
39+
let (producer, events) = try KafkaProducer.makeProducerWithEvents(
4040
config: config,
4141
logger: logger
4242
)
@@ -53,7 +53,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
5353
try await serviceGroup.run()
5454
}
5555

56-
// Task receiving acknowledgements
56+
// Task sending message and receiving events
5757
group.addTask {
5858
let messageID = try producer.send(
5959
KafkaProducerMessage(
@@ -62,8 +62,13 @@ await withThrowingTaskGroup(of: Void.self) { group in
6262
)
6363
)
6464

65-
for await acknowledgement in acknowledgements {
66-
// Check if acknowledgement belongs to the sent message
65+
for await event in events {
66+
switch event {
67+
case .deliveryReports(let deliveryReports):
68+
// Check what messages the delivery reports belong to
69+
default:
70+
break // Ignore any other events
71+
}
6772
}
6873
}
6974
}

Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public struct KafkaProducerConfiguration {
1616
// MARK: - SwiftKafka-specific Config properties
1717

1818
/// The time between two consecutive polls.
19-
/// Effectively controls the rate at which incoming events and acknowledgments are consumed.
19+
/// Effectively controls the rate at which incoming events are consumed.
2020
/// Default: `.milliseconds(100)`
2121
public var pollInterval: Duration = .milliseconds(100)
2222

Sources/SwiftKafka/KafkaAcknowledgedMessage.swift

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ import NIOCore
1717

1818
/// A message produced by the client and acknowledged by the Kafka cluster.
1919
public struct KafkaAcknowledgedMessage {
20-
/// The unique identifier assigned by the ``KafkaProducer`` when the message was send to Kafka.
21-
/// The same identifier is returned by ``KafkaProducer/send(_:)`` and can be used to correlate
22-
/// a sent message and an acknowledged message.
23-
public var id: KafkaProducerMessageID
2420
/// The topic that the message was sent to.
2521
public var topic: String
2622
/// The partition that the message was sent to.
@@ -34,16 +30,14 @@ public struct KafkaAcknowledgedMessage {
3430

3531
/// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer.
3632
/// - Throws: A ``KafkaAcknowledgedMessageError`` for failed acknowledgements or malformed messages.
37-
init(messagePointer: UnsafePointer<rd_kafka_message_t>, id: KafkaProducerMessageID) throws {
38-
self.id = id
39-
33+
init(messagePointer: UnsafePointer<rd_kafka_message_t>) throws {
4034
let rdKafkaMessage = messagePointer.pointee
4135

4236
let valueBufferPointer = UnsafeRawBufferPointer(start: rdKafkaMessage.payload, count: rdKafkaMessage.len)
4337
self.value = ByteBuffer(bytes: valueBufferPointer)
4438

4539
guard rdKafkaMessage.err == RD_KAFKA_RESP_ERR_NO_ERROR else {
46-
throw KafkaAcknowledgedMessageError.fromRDKafkaError(messageID: self.id, error: rdKafkaMessage.err)
40+
throw KafkaError.rdKafkaError(wrapping: rdKafkaMessage.err)
4741
}
4842

4943
guard let topic = String(validatingUTF8: rd_kafka_topic_name(rdKafkaMessage.rkt)) else {

Sources/SwiftKafka/KafkaAcknowledgedMessageError.swift

Lines changed: 0 additions & 79 deletions
This file was deleted.

Sources/SwiftKafka/KafkaConsumerMessage.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public struct KafkaConsumerMessage {
3434
let rdKafkaMessage = messagePointer.pointee
3535

3636
guard let valuePointer = rdKafkaMessage.payload else {
37-
fatalError("Could not resolve payload of acknowledged message")
37+
fatalError("Could not resolve payload of consumer message")
3838
}
3939

4040
let valueBufferPointer = UnsafeRawBufferPointer(start: valuePointer, count: rdKafkaMessage.len)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Crdkafka
16+
17+
/// A delivery report for a message that was sent to the Kafka cluster.
18+
public struct KafkaDeliveryReport: Sendable, Hashable {
19+
public enum Status: Sendable, Hashable {
20+
/// The message has been successfully acknowledged by the Kafka cluster.
21+
case acknowledged(message: KafkaAcknowledgedMessage)
22+
/// The message failed to be acknowledged by the Kafka cluster and encountered an error.
23+
case failure(KafkaError)
24+
}
25+
26+
/// The status of a Kafka producer message after attempting to send it.
27+
public var status: Status
28+
29+
/// The unique identifier assigned by the ``KafkaProducer`` when the message was sent to Kafka.
30+
/// The same identifier is returned by ``KafkaProducer/send(_:)`` and can be used to correlate
31+
/// a sent message with a delivery report.
32+
public var id: KafkaProducerMessageID
33+
34+
internal init?(messagePointer: UnsafePointer<rd_kafka_message_t>?) {
35+
guard let messagePointer else {
36+
return nil
37+
}
38+
39+
self.id = KafkaProducerMessageID(rawValue: UInt(bitPattern: messagePointer.pointee._private))
40+
41+
do {
42+
let message = try KafkaAcknowledgedMessage(messagePointer: messagePointer)
43+
self.status = .acknowledged(message: message)
44+
} catch {
45+
guard let error = error as? KafkaError else {
46+
fatalError("Caught error that is not of type \(KafkaError.self)")
47+
}
48+
self.status = .failure(error)
49+
}
50+
}
51+
}

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,26 @@ extension KafkaProducerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
4141
}
4242
}
4343

44-
// MARK: - KafkaMessageAcknowledgements
44+
// MARK: - KafkaProducerEvents
4545

46-
/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
47-
public struct KafkaMessageAcknowledgements: AsyncSequence {
48-
public typealias Element = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
46+
/// `AsyncSequence` implementation for handling ``KafkaProducerEvent``s emitted by Kafka.
47+
public struct KafkaProducerEvents: AsyncSequence {
48+
public typealias Element = KafkaProducerEvent
4949
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
5050
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaProducerCloseOnTerminate>
5151
let wrappedSequence: WrappedSequence
5252

53-
/// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
54-
public struct AcknowledgedMessagesAsyncIterator: AsyncIteratorProtocol {
53+
/// `AsynceIteratorProtocol` implementation for handling ``KafkaProducerEvent``s emitted by Kafka.
54+
public struct AsyncIterator: AsyncIteratorProtocol {
5555
var wrappedIterator: WrappedSequence.AsyncIterator
5656

5757
public mutating func next() async -> Element? {
5858
await self.wrappedIterator.next()
5959
}
6060
}
6161

62-
public func makeAsyncIterator() -> AcknowledgedMessagesAsyncIterator {
63-
return AcknowledgedMessagesAsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator())
62+
public func makeAsyncIterator() -> AsyncIterator {
63+
return AsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator())
6464
}
6565
}
6666

@@ -72,7 +72,7 @@ public struct KafkaMessageAcknowledgements: AsyncSequence {
7272
/// configuration object (only works if server has `auto.create.topics.enable` property set).
7373
public final class KafkaProducer: Service, Sendable {
7474
typealias Producer = NIOAsyncSequenceProducer<
75-
Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>,
75+
KafkaProducerEvent,
7676
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
7777
KafkaProducerCloseOnTerminate
7878
>
@@ -104,7 +104,7 @@ public final class KafkaProducer: Service, Sendable {
104104

105105
/// Initialize a new ``KafkaProducer``.
106106
///
107-
/// This factory method creates a producer without message acknowledgements.
107+
/// This factory method creates a producer without listening for events.
108108
///
109109
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
110110
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
@@ -141,28 +141,28 @@ public final class KafkaProducer: Service, Sendable {
141141
return producer
142142
}
143143

144-
/// Initialize a new ``KafkaProducer`` and a ``KafkaMessageAcknowledgements`` asynchronous sequence.
144+
/// Initialize a new ``KafkaProducer`` and a ``KafkaProducerEvents`` asynchronous sequence.
145145
///
146-
/// Use the asynchronous sequence to consume message acknowledgements.
146+
/// Use the asynchronous sequence to consume events.
147147
///
148148
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
149-
/// Additionally, make sure to consume the asynchronous sequence otherwise the acknowledgements will be buffered in memory indefinitely.
149+
/// Additionally, make sure to consume the asynchronous sequence otherwise the events will be buffered in memory indefinitely.
150150
///
151151
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
152152
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
153153
/// - Parameter logger: A logger.
154-
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
155-
/// `AsyncSequence` used for receiving message acknowledgements.
154+
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaProducerEvents``
155+
/// `AsyncSequence` used for receiving message events.
156156
/// - Throws: A ``KafkaError`` if initializing the producer failed.
157-
public static func makeProducerWithAcknowledgements(
157+
public static func makeProducerWithEvents(
158158
config: KafkaProducerConfiguration = KafkaProducerConfiguration(),
159159
topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
160160
logger: Logger
161-
) throws -> (KafkaProducer, KafkaMessageAcknowledgements) {
161+
) throws -> (KafkaProducer, KafkaProducerEvents) {
162162
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
163163

164164
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
165-
elementType: Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>.self,
165+
elementType: KafkaProducerEvent.self,
166166
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
167167
delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine)
168168
)
@@ -188,11 +188,11 @@ public final class KafkaProducer: Service, Sendable {
188188
)
189189
}
190190

191-
let acknowlegementsSequence = KafkaMessageAcknowledgements(wrappedSequence: sourceAndSequence.sequence)
192-
return (producer, acknowlegementsSequence)
191+
let eventsSequence = KafkaProducerEvents(wrappedSequence: sourceAndSequence.sequence)
192+
return (producer, eventsSequence)
193193
}
194194

195-
/// Start polling Kafka for acknowledged messages.
195+
/// Start polling Kafka for events.
196196
///
197197
/// - Returns: An awaitable task representing the execution of the poll loop.
198198
public func run() async throws {
@@ -208,18 +208,14 @@ public final class KafkaProducer: Service, Sendable {
208208
let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
209209
switch nextAction {
210210
case .pollWithoutYield(let client):
211-
// Drop any incoming acknowledgments
211+
// Drop any incoming events
212212
let _ = client.eventPoll()
213213
case .pollAndYield(let client, let source):
214214
let events = client.eventPoll()
215215
for event in events {
216-
switch event {
217-
case .deliveryReport(let results):
218-
// Ignore YieldResult as we don't support back pressure in KafkaProducer
219-
results.forEach { _ = source?.yield($0) }
220-
default:
221-
break // Ignore
222-
}
216+
let producerEvent = KafkaProducerEvent(event)
217+
// Ignore YieldResult as we don't support back pressure in KafkaProducer
218+
_ = source?.yield(producerEvent)
223219
}
224220
try await Task.sleep(for: self.config.pollInterval)
225221
case .flushFinishSourceAndTerminatePollLoop(let client, let source):
@@ -292,8 +288,8 @@ extension KafkaProducer {
292288
source: Producer.Source?,
293289
topicHandles: RDKafkaTopicHandles
294290
)
295-
/// Producer is still running but the acknowledgement asynchronous sequence was terminated.
296-
/// All incoming acknowledgements will be dropped.
291+
/// Producer is still running but the event asynchronous sequence was terminated.
292+
/// All incoming events will be dropped.
297293
///
298294
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
299295
case consumptionStopped(client: RDKafkaClient)
@@ -336,7 +332,7 @@ extension KafkaProducer {
336332
///
337333
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
338334
case pollWithoutYield(client: RDKafkaClient)
339-
/// Poll client and yield acknowledgments if any received.
335+
/// Poll client and yield events if any received.
340336
///
341337
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
342338
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
@@ -403,7 +399,7 @@ extension KafkaProducer {
403399
topicHandles: topicHandles
404400
)
405401
case .consumptionStopped:
406-
throw KafkaError.connectionClosed(reason: "Sequence consuming acknowledgements was abruptly terminated, producer closed")
402+
throw KafkaError.connectionClosed(reason: "Sequence consuming events was abruptly terminated, producer closed")
407403
case .finishing:
408404
throw KafkaError.connectionClosed(reason: "Producer in the process of finishing")
409405
case .finished:
@@ -419,8 +415,8 @@ extension KafkaProducer {
419415
case finishSource(source: Producer.Source?)
420416
}
421417

422-
/// The acknowledgements asynchronous sequence was terminated.
423-
/// All incoming acknowledgements will be dropped.
418+
/// The events asynchronous sequence was terminated.
419+
/// All incoming events will be dropped.
424420
mutating func stopConsuming() -> StopConsumingAction? {
425421
switch self.state {
426422
case .uninitialized:
@@ -431,7 +427,7 @@ extension KafkaProducer {
431427
self.state = .consumptionStopped(client: client)
432428
return .finishSource(source: source)
433429
case .finishing(let client, let source):
434-
// Setting source to nil to prevent incoming acknowledgements from buffering in `source`
430+
// Setting source to nil to prevent incoming events from buffering in `source`
435431
self.state = .finishing(client: client, source: nil)
436432
return .finishSource(source: source)
437433
case .finished:

0 commit comments

Comments
 (0)