Skip to content

Commit 071f51d

Browse files
KafkaConsumer: expose event sequence (#97)
* `KafkaConsumer`: expose event sequence Motivation: Like in `KafkaProducer` (#96), we want to expose an asynchronous sequence that is able to emit all sorts of events in the future (e.g. rebalance events for the `KafkaConsumer`. Modifications: * add a new type `KafkaConsumerEvent` (currently empty) * make `KafkaConsumer.init` private * add factory methods `KafkaConsumer.createConsumer` and `KafkaConsumer.createConsumerWithEvents` * create a new `AsyncSequence`: `KafkaConsumerEvents` * update README * update tests * `AsyncIterator` rename Modifications: * rename to `KafkaConsumerEvents.AsyncIterator` * rename to `KafkaConsumerMessages.AsyncIterator` * Review Gus: missing documentation * Review Franz Modifications: * make `KafkaConsumerEvents` `Sendable` * make `KafkaProducerEvents` `Sendable` * replace `KafkaConsumer.makeConsumer` with `convenience init` * replace `KafkaProducer.makeProducer` with `convenience init` * Fix post-rebase issue * Remove outdated comments
1 parent a3df8d8 commit 071f51d

File tree

4 files changed

+161
-34
lines changed

4 files changed

+161
-34
lines changed

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 122 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,29 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
3535
}
3636
}
3737

38+
// MARK: - KafkaConsumerEvents
39+
40+
/// `AsyncSequence` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka.
41+
public struct KafkaConsumerEvents: Sendable, AsyncSequence {
42+
public typealias Element = KafkaConsumerEvent
43+
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
44+
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaConsumerCloseOnTerminate>
45+
let wrappedSequence: WrappedSequence
46+
47+
/// `AsynceIteratorProtocol` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka.
48+
public struct AsyncIterator: AsyncIteratorProtocol {
49+
var wrappedIterator: WrappedSequence.AsyncIterator
50+
51+
public mutating func next() async -> Element? {
52+
await self.wrappedIterator.next()
53+
}
54+
}
55+
56+
public func makeAsyncIterator() -> AsyncIterator {
57+
return AsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator())
58+
}
59+
}
60+
3861
// MARK: - KafkaConsumerMessages
3962

4063
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
@@ -52,7 +75,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
5275
let wrappedSequence: WrappedSequence
5376

5477
/// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
55-
public struct ConsumerMessagesAsyncIterator: AsyncIteratorProtocol {
78+
public struct AsyncIterator: AsyncIteratorProtocol {
5679
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
5780
var wrappedIterator: WrappedSequence.AsyncIterator?
5881

@@ -80,8 +103,8 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
80103
}
81104
}
82105

83-
public func makeAsyncIterator() -> ConsumerMessagesAsyncIterator {
84-
return ConsumerMessagesAsyncIterator(
106+
public func makeAsyncIterator() -> AsyncIterator {
107+
return AsyncIterator(
85108
stateMachine: self.stateMachine,
86109
wrappedIterator: self.wrappedSequence.makeAsyncIterator()
87110
)
@@ -108,34 +131,27 @@ public final class KafkaConsumer: Sendable, Service {
108131
/// `AsyncSequence` that returns all ``KafkaConsumerMessage`` objects that the consumer receives.
109132
public let messages: KafkaConsumerMessages
110133

134+
// Private initializer, use factory method or convenience init to create KafkaConsumer
111135
/// Initialize a new ``KafkaConsumer``.
112136
/// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)``
113137
/// or assign the consumer to a particular topic + partition pair using ``assign(topic:partition:offset:)``.
114-
/// - Parameter config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
115-
/// - Parameter logger: A logger.
138+
///
139+
/// - Parameters:
140+
/// - client: Client used for handling the connection to the Kafka cluster.
141+
/// - stateMachine: The state machine containing the state of the ``KafkaConsumer``.
142+
/// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
143+
/// - logger: A logger.
116144
/// - Throws: A ``KafkaError`` if the initialization failed.
117-
public init(
145+
private init(
146+
client: RDKafkaClient,
147+
stateMachine: NIOLockedValueBox<StateMachine>,
118148
config: KafkaConsumerConfiguration,
119149
logger: Logger
120150
) throws {
121151
self.config = config
152+
self.stateMachine = stateMachine
122153
self.logger = logger
123154

124-
var subscribedEvents: [RDKafkaEvent] = [.log, .fetch]
125-
// Only listen to offset commit events when autoCommit is false
126-
if self.config.enableAutoCommit == false {
127-
subscribedEvents.append(.offsetCommit)
128-
}
129-
130-
let client = try RDKafkaClient.makeClient(
131-
type: .consumer,
132-
configDictionary: config.dictionary,
133-
events: subscribedEvents,
134-
logger: logger
135-
)
136-
137-
self.stateMachine = NIOLockedValueBox(StateMachine(logger: self.logger))
138-
139155
let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
140156
elementType: KafkaConsumerMessage.self,
141157
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
@@ -166,6 +182,91 @@ public final class KafkaConsumer: Sendable, Service {
166182
}
167183
}
168184

185+
/// Initialize a new ``KafkaConsumer``.
186+
///
187+
/// This creates a consumer without that does not listen to any events other than consumer messages.
188+
///
189+
/// - Parameters:
190+
/// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
191+
/// - logger: A logger.
192+
/// - Returns: The newly created ``KafkaConsumer``.
193+
/// - Throws: A ``KafkaError`` if the initialization failed.
194+
public convenience init(
195+
config: KafkaConsumerConfiguration,
196+
logger: Logger
197+
) throws {
198+
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
199+
200+
var subscribedEvents: [RDKafkaEvent] = [.log, .fetch]
201+
// Only listen to offset commit events when autoCommit is false
202+
if config.enableAutoCommit == false {
203+
subscribedEvents.append(.offsetCommit)
204+
}
205+
206+
let client = try RDKafkaClient.makeClient(
207+
type: .consumer,
208+
configDictionary: config.dictionary,
209+
events: subscribedEvents,
210+
logger: logger
211+
)
212+
213+
try self.init(
214+
client: client,
215+
stateMachine: stateMachine,
216+
config: config,
217+
logger: logger
218+
)
219+
}
220+
221+
/// Initialize a new ``KafkaConsumer`` and a ``KafkaConsumerEvents`` asynchronous sequence.
222+
///
223+
/// Use the asynchronous sequence to consume events.
224+
///
225+
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
226+
/// Additionally, make sure to consume the asynchronous sequence otherwise the events will be buffered in memory indefinitely.
227+
///
228+
/// - Parameters:
229+
/// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
230+
/// - logger: A logger.
231+
/// - Returns: A tuple containing the created ``KafkaConsumer`` and the ``KafkaConsumerEvents``
232+
/// `AsyncSequence` used for receiving message events.
233+
/// - Throws: A ``KafkaError`` if the initialization failed.
234+
public static func makeConsumerWithEvents(
235+
config: KafkaConsumerConfiguration,
236+
logger: Logger
237+
) throws -> (KafkaConsumer, KafkaConsumerEvents) {
238+
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
239+
240+
var subscribedEvents: [RDKafkaEvent] = [.log, .fetch]
241+
// Only listen to offset commit events when autoCommit is false
242+
if config.enableAutoCommit == false {
243+
subscribedEvents.append(.offsetCommit)
244+
}
245+
246+
let client = try RDKafkaClient.makeClient(
247+
type: .consumer,
248+
configDictionary: config.dictionary,
249+
events: subscribedEvents,
250+
logger: logger
251+
)
252+
253+
let consumer = try KafkaConsumer(
254+
client: client,
255+
stateMachine: stateMachine,
256+
config: config,
257+
logger: logger
258+
)
259+
260+
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
261+
elementType: KafkaConsumerEvent.self,
262+
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
263+
delegate: KafkaConsumerCloseOnTerminate(stateMachine: stateMachine)
264+
)
265+
266+
let eventsSequence = KafkaConsumerEvents(wrappedSequence: sourceAndSequence.sequence)
267+
return (consumer, eventsSequence)
268+
}
269+
169270
/// Subscribe to the given list of `topics`.
170271
/// The partition assignment happens automatically using `KafkaConsumer`'s consumer group.
171272
/// - Parameter topics: An array of topic names to subscribe to.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// An enumeration representing events that can be received through the ``KafkaConsumerEvents`` asynchronous sequence.
16+
public enum KafkaConsumerEvent: Sendable, Hashable {
17+
/// - Important: Always provide a `default` case when switiching over this `enum`.
18+
case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY
19+
20+
internal init(_ event: RDKafkaClient.KafkaEvent) {
21+
switch event {
22+
case .deliveryReport:
23+
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
24+
case .consumerMessages:
25+
fatalError("Consumer messages should be handled in the KafkaConsumerMessages asynchronous sequence")
26+
}
27+
}
28+
}

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ extension KafkaProducerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
4444
// MARK: - KafkaProducerEvents
4545

4646
/// `AsyncSequence` implementation for handling ``KafkaProducerEvent``s emitted by Kafka.
47-
public struct KafkaProducerEvents: AsyncSequence {
47+
public struct KafkaProducerEvents: Sendable, AsyncSequence {
4848
public typealias Element = KafkaProducerEvent
4949
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
5050
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaProducerCloseOnTerminate>
@@ -85,7 +85,7 @@ public final class KafkaProducer: Service, Sendable {
8585
/// Topic configuration that is used when a new topic has to be created by the producer.
8686
private let topicConfig: KafkaTopicConfiguration
8787

88-
// Private initializer, use factory methods to create KafkaProducer
88+
// Private initializer, use factory method or convenience init to create KafkaProducer
8989
/// Initialize a new ``KafkaProducer``.
9090
///
9191
/// - Parameter stateMachine: The ``KafkaProducer/StateMachine`` instance associated with the ``KafkaProducer``.///
@@ -104,18 +104,18 @@ public final class KafkaProducer: Service, Sendable {
104104

105105
/// Initialize a new ``KafkaProducer``.
106106
///
107-
/// This factory method creates a producer without listening for events.
107+
/// This creates a producer without listening for events.
108108
///
109109
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
110110
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
111111
/// - Parameter logger: A logger.
112112
/// - Returns: The newly created ``KafkaProducer``.
113113
/// - Throws: A ``KafkaError`` if initializing the producer failed.
114-
public static func makeProducer(
114+
public convenience init(
115115
config: KafkaProducerConfiguration = KafkaProducerConfiguration(),
116116
topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
117117
logger: Logger
118-
) throws -> KafkaProducer {
118+
) throws {
119119
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
120120

121121
let client = try RDKafkaClient.makeClient(
@@ -125,20 +125,18 @@ public final class KafkaProducer: Service, Sendable {
125125
logger: logger
126126
)
127127

128-
let producer = try KafkaProducer(
129-
stateMachine: stateMachine,
130-
config: config,
131-
topicConfig: topicConfig
132-
)
133-
134128
stateMachine.withLockedValue {
135129
$0.initialize(
136130
client: client,
137131
source: nil
138132
)
139133
}
140134

141-
return producer
135+
try self.init(
136+
stateMachine: stateMachine,
137+
config: config,
138+
topicConfig: topicConfig
139+
)
142140
}
143141

144142
/// Initialize a new ``KafkaProducer`` and a ``KafkaProducerEvents`` asynchronous sequence.

Tests/SwiftKafkaTests/KafkaProducerTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ final class KafkaProducerTests: XCTestCase {
246246
var config = KafkaProducerConfiguration()
247247
config.bootstrapServers = []
248248

249-
let producer = try KafkaProducer.makeProducer(config: config, logger: mockLogger)
249+
let producer = try KafkaProducer(config: config, logger: mockLogger)
250250

251251
let serviceGroup = ServiceGroup(
252252
services: [producer],

0 commit comments

Comments
 (0)