Skip to content

Commit 4f9520f

Browse files
Merge remote-tracking branch 'origin/main' into feature/sc-2764/gsoc-support-for-transactions
2 parents b2c1619 + 071f51d commit 4f9520f

14 files changed

+362
-139
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
extension Array: KafkaContiguousBytes where Array.Element == UInt8 {}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
extension ByteBuffer: KafkaContiguousBytes {
18+
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
19+
try self.withUnsafeReadableBytes {
20+
try body($0)
21+
}
22+
}
23+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// Conformance to this protocol gives users a way to provide their own "bag of bytes" types
16+
/// to be used for serialization of Kafka messages.
17+
/// It provides a general interface for bytes since the Swift Standard Library currently does not
18+
/// provide such a protocol.
19+
///
20+
/// By conforming your own types to this protocol, you will be able to pass instances of said types
21+
/// directly to ``KafkaProducerMessage`` as key and value.
22+
public protocol KafkaContiguousBytes {
23+
/// Calls the given closure with the contents of underlying storage.
24+
///
25+
/// - note: Calling `withUnsafeBytes` multiple times does not guarantee that
26+
/// the same buffer pointer will be passed in every time.
27+
/// - warning: The buffer argument to the body should not be stored or used
28+
/// outside of the lifetime of the call to the closure.
29+
func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R
30+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
extension Never: KafkaContiguousBytes {
16+
public func withUnsafeBytes<R>(_: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
17+
fatalError("This statement should never be reached")
18+
}
19+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
extension String: KafkaContiguousBytes {
18+
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
19+
if let read = try self.utf8.withContiguousStorageIfAvailable({ unsafePointer in
20+
// Fast Path
21+
let unsafeRawBufferPointer = UnsafeRawBufferPointer(start: unsafePointer.baseAddress, count: self.count)
22+
return try body(unsafeRawBufferPointer)
23+
}) {
24+
return read
25+
} else {
26+
// Slow path
27+
return try ByteBuffer(string: self).withUnsafeBytes(body)
28+
}
29+
}
30+
}

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 122 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,29 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
3535
}
3636
}
3737

38+
// MARK: - KafkaConsumerEvents
39+
40+
/// `AsyncSequence` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka.
41+
public struct KafkaConsumerEvents: Sendable, AsyncSequence {
42+
public typealias Element = KafkaConsumerEvent
43+
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
44+
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaConsumerCloseOnTerminate>
45+
let wrappedSequence: WrappedSequence
46+
47+
/// `AsynceIteratorProtocol` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka.
48+
public struct AsyncIterator: AsyncIteratorProtocol {
49+
var wrappedIterator: WrappedSequence.AsyncIterator
50+
51+
public mutating func next() async -> Element? {
52+
await self.wrappedIterator.next()
53+
}
54+
}
55+
56+
public func makeAsyncIterator() -> AsyncIterator {
57+
return AsyncIterator(wrappedIterator: self.wrappedSequence.makeAsyncIterator())
58+
}
59+
}
60+
3861
// MARK: - KafkaConsumerMessages
3962

4063
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
@@ -52,7 +75,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
5275
let wrappedSequence: WrappedSequence
5376

5477
/// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
55-
public struct ConsumerMessagesAsyncIterator: AsyncIteratorProtocol {
78+
public struct AsyncIterator: AsyncIteratorProtocol {
5679
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
5780
var wrappedIterator: WrappedSequence.AsyncIterator?
5881

@@ -80,8 +103,8 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
80103
}
81104
}
82105

83-
public func makeAsyncIterator() -> ConsumerMessagesAsyncIterator {
84-
return ConsumerMessagesAsyncIterator(
106+
public func makeAsyncIterator() -> AsyncIterator {
107+
return AsyncIterator(
85108
stateMachine: self.stateMachine,
86109
wrappedIterator: self.wrappedSequence.makeAsyncIterator()
87110
)
@@ -108,28 +131,27 @@ public final class KafkaConsumer: Sendable, Service {
108131
/// `AsyncSequence` that returns all ``KafkaConsumerMessage`` objects that the consumer receives.
109132
public let messages: KafkaConsumerMessages
110133

134+
// Private initializer, use factory method or convenience init to create KafkaConsumer
111135
/// Initialize a new ``KafkaConsumer``.
112136
/// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)``
113137
/// or assign the consumer to a particular topic + partition pair using ``assign(topic:partition:offset:)``.
114-
/// - Parameter config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
115-
/// - Parameter logger: A logger.
138+
///
139+
/// - Parameters:
140+
/// - client: Client used for handling the connection to the Kafka cluster.
141+
/// - stateMachine: The state machine containing the state of the ``KafkaConsumer``.
142+
/// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
143+
/// - logger: A logger.
116144
/// - Throws: A ``KafkaError`` if the initialization failed.
117-
public init(
145+
private init(
146+
client: RDKafkaClient,
147+
stateMachine: NIOLockedValueBox<StateMachine>,
118148
config: KafkaConsumerConfiguration,
119149
logger: Logger
120150
) throws {
121151
self.config = config
152+
self.stateMachine = stateMachine
122153
self.logger = logger
123154

124-
let client = try RDKafkaClient.makeClient(
125-
type: .consumer,
126-
configDictionary: config.dictionary,
127-
events: [.log, .fetch, .offsetCommit],
128-
logger: logger
129-
)
130-
131-
self.stateMachine = NIOLockedValueBox(StateMachine(logger: self.logger))
132-
133155
let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
134156
elementType: KafkaConsumerMessage.self,
135157
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
@@ -160,6 +182,91 @@ public final class KafkaConsumer: Sendable, Service {
160182
}
161183
}
162184

185+
/// Initialize a new ``KafkaConsumer``.
186+
///
187+
/// This creates a consumer without that does not listen to any events other than consumer messages.
188+
///
189+
/// - Parameters:
190+
/// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
191+
/// - logger: A logger.
192+
/// - Returns: The newly created ``KafkaConsumer``.
193+
/// - Throws: A ``KafkaError`` if the initialization failed.
194+
public convenience init(
195+
config: KafkaConsumerConfiguration,
196+
logger: Logger
197+
) throws {
198+
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
199+
200+
var subscribedEvents: [RDKafkaEvent] = [.log, .fetch]
201+
// Only listen to offset commit events when autoCommit is false
202+
if config.enableAutoCommit == false {
203+
subscribedEvents.append(.offsetCommit)
204+
}
205+
206+
let client = try RDKafkaClient.makeClient(
207+
type: .consumer,
208+
configDictionary: config.dictionary,
209+
events: subscribedEvents,
210+
logger: logger
211+
)
212+
213+
try self.init(
214+
client: client,
215+
stateMachine: stateMachine,
216+
config: config,
217+
logger: logger
218+
)
219+
}
220+
221+
/// Initialize a new ``KafkaConsumer`` and a ``KafkaConsumerEvents`` asynchronous sequence.
222+
///
223+
/// Use the asynchronous sequence to consume events.
224+
///
225+
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
226+
/// Additionally, make sure to consume the asynchronous sequence otherwise the events will be buffered in memory indefinitely.
227+
///
228+
/// - Parameters:
229+
/// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
230+
/// - logger: A logger.
231+
/// - Returns: A tuple containing the created ``KafkaConsumer`` and the ``KafkaConsumerEvents``
232+
/// `AsyncSequence` used for receiving message events.
233+
/// - Throws: A ``KafkaError`` if the initialization failed.
234+
public static func makeConsumerWithEvents(
235+
config: KafkaConsumerConfiguration,
236+
logger: Logger
237+
) throws -> (KafkaConsumer, KafkaConsumerEvents) {
238+
let stateMachine = NIOLockedValueBox(StateMachine(logger: logger))
239+
240+
var subscribedEvents: [RDKafkaEvent] = [.log, .fetch]
241+
// Only listen to offset commit events when autoCommit is false
242+
if config.enableAutoCommit == false {
243+
subscribedEvents.append(.offsetCommit)
244+
}
245+
246+
let client = try RDKafkaClient.makeClient(
247+
type: .consumer,
248+
configDictionary: config.dictionary,
249+
events: subscribedEvents,
250+
logger: logger
251+
)
252+
253+
let consumer = try KafkaConsumer(
254+
client: client,
255+
stateMachine: stateMachine,
256+
config: config,
257+
logger: logger
258+
)
259+
260+
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
261+
elementType: KafkaConsumerEvent.self,
262+
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
263+
delegate: KafkaConsumerCloseOnTerminate(stateMachine: stateMachine)
264+
)
265+
266+
let eventsSequence = KafkaConsumerEvents(wrappedSequence: sourceAndSequence.sequence)
267+
return (consumer, eventsSequence)
268+
}
269+
163270
/// Subscribe to the given list of `topics`.
164271
/// The partition assignment happens automatically using `KafkaConsumer`'s consumer group.
165272
/// - Parameter topics: An array of topic names to subscribe to.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// An enumeration representing events that can be received through the ``KafkaConsumerEvents`` asynchronous sequence.
16+
public enum KafkaConsumerEvent: Sendable, Hashable {
17+
/// - Important: Always provide a `default` case when switiching over this `enum`.
18+
case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY
19+
20+
internal init(_ event: RDKafkaClient.KafkaEvent) {
21+
switch event {
22+
case .deliveryReport:
23+
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
24+
case .consumerMessages:
25+
fatalError("Consumer messages should be handled in the KafkaConsumerMessages asynchronous sequence")
26+
}
27+
}
28+
}

Sources/SwiftKafka/KafkaError.swift

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,6 @@ public struct KafkaError: Error, CustomStringConvertible {
6464
)
6565
}
6666

67-
static func acknowledgement(
68-
reason: String, file: String = #fileID, line: UInt = #line
69-
) -> KafkaError {
70-
return KafkaError(
71-
backing: .init(
72-
code: .acknowledgement, reason: reason, file: file, line: line
73-
)
74-
)
75-
}
76-
7767
static func config(
7868
reason: String, file: String = #fileID, line: UInt = #line
7969
) -> KafkaError {
@@ -184,7 +174,6 @@ extension KafkaError {
184174
public struct ErrorCode: Hashable, Sendable, CustomStringConvertible {
185175
fileprivate enum BackingCode {
186176
case rdKafkaError
187-
case acknowledgement
188177
case config
189178
case topicConfig
190179
case connectionClosed
@@ -206,8 +195,6 @@ extension KafkaError {
206195

207196
/// Errors caused by the underlying `librdkafka` library.
208197
public static let rdKafkaError = ErrorCode(.rdKafkaError)
209-
/// A ``KafkaProducerMessage`` could not be acknowledged by the Kafka server.
210-
public static let acknowledgement = ErrorCode(.acknowledgement)
211198
/// There is an error in the Kafka client configuration.
212199
public static let config = ErrorCode(.config)
213200
/// There is an error in the Kafka topic configuration.

0 commit comments

Comments
 (0)