Skip to content

Commit a3df8d8

Browse files
KafkaProducerMessage: generic key & value (#95)
* `KafkaProducerMessage`: generic key & value Motivation: We want to allow our users to feed arbitrary data as `key` and `value` of a `KafkaProducerMessage`. Modifications: * add new `public protocol KafkaBuffer` which asks implementing types to provide safe access to an `UnsafeMutableRawBufferPointer` containing the data * implement `KafkaBuffer` in `String` * implement `KafkaBuffer` in `ByteBuffer` * update tests * Review Franz Modifications: * remove duplicate docc comments for `KafkaBuffer` adopters * spell out `K` and `V` as `Key` and `Value` * `String+KafkaBuffer`: only convert `String` to `ByteBuffer` on slow path (no contiguous bytes availble) * make `Never` conform to `KafkaBuffer` * `KafkaProducerMessage`: make `key` optional * `KafkaContiguousBytes` type Modifications: * change `KafkaBuffer` `protocol` into `KafkaContiguousBytes` type (similar to `SwiftProtobuf/SwiftProtobufContiguousBytes`) * make `Array where Array.Element == UInt8` conform to `KafkaContiguousBytes` * Fix post-rebase issue * Changes Franz Modifications: * fix outdated `KafkaProducerMessage` documentation * simplify `String+KafkaContiguousBytes`
1 parent 0b4ccd6 commit a3df8d8

10 files changed

+200
-71
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
extension Array: KafkaContiguousBytes where Array.Element == UInt8 {}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
extension ByteBuffer: KafkaContiguousBytes {
18+
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
19+
try self.withUnsafeReadableBytes {
20+
try body($0)
21+
}
22+
}
23+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// Conformance to this protocol gives users a way to provide their own "bag of bytes" types
16+
/// to be used for serialization of Kafka messages.
17+
/// It provides a general interface for bytes since the Swift Standard Library currently does not
18+
/// provide such a protocol.
19+
///
20+
/// By conforming your own types to this protocol, you will be able to pass instances of said types
21+
/// directly to ``KafkaProducerMessage`` as key and value.
22+
public protocol KafkaContiguousBytes {
23+
/// Calls the given closure with the contents of underlying storage.
24+
///
25+
/// - note: Calling `withUnsafeBytes` multiple times does not guarantee that
26+
/// the same buffer pointer will be passed in every time.
27+
/// - warning: The buffer argument to the body should not be stored or used
28+
/// outside of the lifetime of the call to the closure.
29+
func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R
30+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
extension Never: KafkaContiguousBytes {
16+
public func withUnsafeBytes<R>(_: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
17+
fatalError("This statement should never be reached")
18+
}
19+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
extension String: KafkaContiguousBytes {
18+
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
19+
if let read = try self.utf8.withContiguousStorageIfAvailable({ unsafePointer in
20+
// Fast Path
21+
let unsafeRawBufferPointer = UnsafeRawBufferPointer(start: unsafePointer.baseAddress, count: self.count)
22+
return try body(unsafeRawBufferPointer)
23+
}) {
24+
return read
25+
} else {
26+
// Slow path
27+
return try ByteBuffer(string: self).withUnsafeBytes(body)
28+
}
29+
}
30+
}

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public final class KafkaProducer: Service, Sendable {
248248
/// of the corresponding ``KafkaAcknowledgedMessage``.
249249
/// - Throws: A ``KafkaError`` if sending the message failed.
250250
@discardableResult
251-
public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
251+
public func send<Key, Value>(_ message: KafkaProducerMessage<Key, Value>) throws -> KafkaProducerMessageID {
252252
let action = try self.stateMachine.withLockedValue { try $0.send() }
253253
switch action {
254254
case .send(let client, let newMessageID, let topicHandles):

Sources/SwiftKafka/KafkaProducerMessage.swift

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,24 @@ import Crdkafka
1616
import NIOCore
1717

1818
/// Message that is sent by the `KafkaProducer`
19-
public struct KafkaProducerMessage {
19+
public struct KafkaProducerMessage<Key: KafkaContiguousBytes, Value: KafkaContiguousBytes> {
2020
public var topic: String
2121
public var partition: KafkaPartition
22-
public var key: ByteBuffer?
23-
public var value: ByteBuffer
22+
public var key: Key?
23+
public var value: Value
2424

25-
/// Create a new `KafkaProducerMessage` with a `ByteBuffer` key and value
26-
/// - Parameter topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
27-
/// - Parameter partition: The topic partition the message will be sent to. If not set explicitly, the partiotion will be assigned automatically.
28-
/// - Parameter key: Used to guarantee that messages with the same key will be sent to the same partittion so that their order is preserved.
29-
/// - Parameter value: The message body.
25+
/// Create a new `KafkaProducerMessage` with a ``KafkaContiguousBytes`` key and value.
26+
///
27+
/// - Parameters:
28+
/// - topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
29+
/// - partition: The topic partition the message will be sent to. If not set explicitly, the partition will be assigned automatically.
30+
/// - key: Used to guarantee that messages with the same key will be sent to the same partittion so that their order is preserved.
31+
/// - value: The message body.
3032
public init(
3133
topic: String,
3234
partition: KafkaPartition? = nil,
33-
key: ByteBuffer? = nil,
34-
value: ByteBuffer
35+
key: Key,
36+
value: Value
3537
) {
3638
self.topic = topic
3739
self.key = key
@@ -43,30 +45,28 @@ public struct KafkaProducerMessage {
4345
self.partition = .unassigned
4446
}
4547
}
48+
}
4649

47-
/// Create a new `KafkaProducerMessage` with a `String` key and value
48-
/// - Parameter topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
49-
/// - Parameter partition: The topic partition the message will be sent to. If not set explicitly, the partiotion will be assigned automatically.
50-
/// - Parameter key: Used to guarantee that messages with the same key will be sent to the same partittion so that their order is preserved.
51-
/// - Parameter value: The message body.
50+
extension KafkaProducerMessage where Key == Never {
51+
/// Create a new `KafkaProducerMessage` with a ``KafkaContiguousBytes`` value.
52+
///
53+
/// - Parameters:
54+
/// - topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
55+
/// - partition: The topic partition the message will be sent to. If not set explicitly, the partiotion will be assigned automatically.
56+
/// - value: The message body.
5257
public init(
5358
topic: String,
5459
partition: KafkaPartition? = nil,
55-
key: String? = nil,
56-
value: String
60+
value: Value
5761
) {
58-
let keyBuffer: ByteBuffer?
59-
if let key = key {
60-
keyBuffer = ByteBuffer(string: key)
62+
self.topic = topic
63+
self.value = value
64+
self.key = nil
65+
66+
if let partition = partition {
67+
self.partition = partition
6168
} else {
62-
keyBuffer = nil
69+
self.partition = .unassigned
6370
}
64-
65-
self.init(
66-
topic: topic,
67-
partition: partition,
68-
key: keyBuffer,
69-
value: ByteBuffer(string: value)
70-
)
7171
}
7272
}

Sources/SwiftKafka/RDKafka/RDKafkaClient.swift

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -97,33 +97,45 @@ final class RDKafkaClient: Sendable {
9797
/// - Parameter newMessageID: ID that was assigned to the `message`.
9898
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
9999
/// - Parameter topicHandles: Topic handles that this client uses to produce new messages
100-
func produce(
101-
message: KafkaProducerMessage,
100+
func produce<Key, Value>(
101+
message: KafkaProducerMessage<Key, Value>,
102102
newMessageID: UInt,
103103
topicConfig: KafkaTopicConfiguration,
104104
topicHandles: RDKafkaTopicHandles
105105
) throws {
106-
let keyBytes: [UInt8]?
107-
if var key = message.key {
108-
keyBytes = key.readBytes(length: key.readableBytes)
109-
} else {
110-
keyBytes = nil
111-
}
112-
113-
let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in
114-
return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in
115-
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
116-
// Returns 0 on success, error code otherwise.
117-
return rd_kafka_produce(
118-
topicHandle,
119-
message.partition.rawValue,
120-
RD_KAFKA_MSG_F_COPY,
121-
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
122-
valueBuffer.count,
123-
keyBytes,
124-
keyBytes?.count ?? 0,
125-
UnsafeMutableRawPointer(bitPattern: newMessageID)
126-
)
106+
let responseCode = try message.value.withUnsafeBytes { valueBuffer in
107+
try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in
108+
if let key = message.key {
109+
// Key available, we can use scoped accessor to safely access its rawBufferPointer.
110+
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
111+
// Returns 0 on success, error code otherwise.
112+
return key.withUnsafeBytes { keyBuffer in
113+
return rd_kafka_produce(
114+
topicHandle,
115+
message.partition.rawValue,
116+
RD_KAFKA_MSG_F_COPY,
117+
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
118+
valueBuffer.count,
119+
keyBuffer.baseAddress,
120+
keyBuffer.count,
121+
UnsafeMutableRawPointer(bitPattern: newMessageID)
122+
)
123+
}
124+
} else {
125+
// No key set.
126+
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
127+
// Returns 0 on success, error code otherwise.
128+
return rd_kafka_produce(
129+
topicHandle,
130+
message.partition.rawValue,
131+
RD_KAFKA_MSG_F_COPY,
132+
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
133+
valueBuffer.count,
134+
nil,
135+
0,
136+
UnsafeMutableRawPointer(bitPattern: newMessageID)
137+
)
138+
}
127139
}
128140
}
129141

Tests/IntegrationTests/SwiftKafkaTests.swift

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import struct Foundation.UUID
16+
import NIOCore
1617
import ServiceLifecycle
1718
@testable import SwiftKafka
1819
import XCTest
@@ -138,8 +139,8 @@ final class SwiftKafkaTests: XCTestCase {
138139

139140
for (index, consumedMessage) in consumedMessages.enumerated() {
140141
XCTAssertEqual(testMessages[index].topic, consumedMessage.topic)
141-
XCTAssertEqual(testMessages[index].key, consumedMessage.key)
142-
XCTAssertEqual(testMessages[index].value, consumedMessage.value)
142+
XCTAssertEqual(ByteBuffer(string: testMessages[index].key!), consumedMessage.key)
143+
XCTAssertEqual(ByteBuffer(string: testMessages[index].value), consumedMessage.value)
143144
}
144145
}
145146

@@ -210,8 +211,8 @@ final class SwiftKafkaTests: XCTestCase {
210211

211212
for (index, consumedMessage) in consumedMessages.enumerated() {
212213
XCTAssertEqual(testMessages[index].topic, consumedMessage.topic)
213-
XCTAssertEqual(testMessages[index].key, consumedMessage.key)
214-
XCTAssertEqual(testMessages[index].value, consumedMessage.value)
214+
XCTAssertEqual(ByteBuffer(string: testMessages[index].key!), consumedMessage.key)
215+
XCTAssertEqual(ByteBuffer(string: testMessages[index].value), consumedMessage.value)
215216
}
216217
}
217218

@@ -352,8 +353,8 @@ final class SwiftKafkaTests: XCTestCase {
352353

353354
for (index, consumedMessage) in consumedMessages.enumerated() {
354355
XCTAssertEqual(testMessages[index].topic, consumedMessage.topic)
355-
XCTAssertEqual(testMessages[index].key, consumedMessage.key)
356-
XCTAssertEqual(testMessages[index].value, consumedMessage.value)
356+
XCTAssertEqual(ByteBuffer(string: testMessages[index].key!), consumedMessage.key)
357+
XCTAssertEqual(ByteBuffer(string: testMessages[index].value), consumedMessage.value)
357358
}
358359
}
359360

@@ -420,8 +421,8 @@ final class SwiftKafkaTests: XCTestCase {
420421

421422
for (index, consumedMessage) in consumedMessages.enumerated() {
422423
XCTAssertEqual(testMessages[firstConsumerOffset + index].topic, consumedMessage.topic)
423-
XCTAssertEqual(testMessages[firstConsumerOffset + index].key, consumedMessage.key)
424-
XCTAssertEqual(testMessages[firstConsumerOffset + index].value, consumedMessage.value)
424+
XCTAssertEqual(ByteBuffer(string: testMessages[firstConsumerOffset + index].key!), consumedMessage.key)
425+
XCTAssertEqual(ByteBuffer(string: testMessages[firstConsumerOffset + index].value), consumedMessage.value)
425426
}
426427
}
427428

@@ -434,7 +435,7 @@ final class SwiftKafkaTests: XCTestCase {
434435

435436
// MARK: - Helpers
436437

437-
private static func createTestMessages(topic: String, count: UInt) -> [KafkaProducerMessage] {
438+
private static func createTestMessages(topic: String, count: UInt) -> [KafkaProducerMessage<String, String>] {
438439
return Array(0..<count).map {
439440
KafkaProducerMessage(
440441
topic: topic,
@@ -447,7 +448,7 @@ final class SwiftKafkaTests: XCTestCase {
447448
private static func sendAndAcknowledgeMessages(
448449
producer: KafkaProducer,
449450
events: KafkaProducerEvents,
450-
messages: [KafkaProducerMessage]
451+
messages: [KafkaProducerMessage<String, String>]
451452
) async throws {
452453
var messageIDs = Set<KafkaProducerMessageID>()
453454

@@ -484,8 +485,8 @@ final class SwiftKafkaTests: XCTestCase {
484485
XCTAssertEqual(messages.count, acknowledgedMessages.count)
485486
for message in messages {
486487
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message.topic }))
487-
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == message.key }))
488-
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message.value }))
488+
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message.key!) }))
489+
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) }))
489490
}
490491
}
491492
}

Tests/SwiftKafkaTests/KafkaProducerTests.swift

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ final class KafkaProducerTests: XCTestCase {
102102
}
103103

104104
XCTAssertEqual(expectedTopic, receivedMessage.topic)
105-
XCTAssertEqual(message.key, receivedMessage.key)
106-
XCTAssertEqual(message.value, receivedMessage.value)
105+
XCTAssertEqual(ByteBuffer(string: message.key!), receivedMessage.key)
106+
XCTAssertEqual(ByteBuffer(string: message.value), receivedMessage.value)
107107

108108
// Shutdown the serviceGroup
109109
await serviceGroup.triggerGracefulShutdown()
@@ -159,7 +159,6 @@ final class KafkaProducerTests: XCTestCase {
159159
}
160160

161161
XCTAssertEqual(expectedTopic, receivedMessage.topic)
162-
XCTAssertEqual(message.key, receivedMessage.key)
163162
XCTAssertEqual(message.value, receivedMessage.value)
164163

165164
// Shutdown the serviceGroup
@@ -227,10 +226,10 @@ final class KafkaProducerTests: XCTestCase {
227226
XCTAssertEqual(2, acknowledgedMessages.count)
228227
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message1.topic }))
229228
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message2.topic }))
230-
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == message1.key }))
231-
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == message2.key }))
232-
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message1.value }))
233-
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == message2.value }))
229+
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message1.key!) }))
230+
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message2.key!) }))
231+
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message1.value) }))
232+
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message2.value) }))
234233

235234
// Shutdown the serviceGroup
236235
await serviceGroup.triggerGracefulShutdown()

0 commit comments

Comments
 (0)