Skip to content

Commit 01b33ba

Browse files
committed
Adapt new config structs in legacy implementation
Modifications: * `ClientConfig` protocol with protocol extension for properties used in both consumer and producer * `ConsumerConfig` and `ProducerConfig` structs * added initializer to `KafkaConfig` that allows conversion from `ClientConfig` to `KafkaConfig`
1 parent 9f62dce commit 01b33ba

File tree

6 files changed

+591
-151
lines changed

6 files changed

+591
-151
lines changed

Sources/SwiftKafka/KafkaConfig.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@ public struct KafkaConfig: Hashable, Equatable {
4646
self.opaque = opaque
4747
}
4848

49+
// TODO: docc
50+
convenience init(clientConfig: any ClientConfig) throws {
51+
self.init(
52+
pointer: rd_kafka_conf_new(),
53+
opaque: nil
54+
)
55+
56+
try clientConfig.properties.forEach { key, value in
57+
try self.set(value, forKey: key)
58+
}
59+
}
60+
4961
/// Initialize internal `KafkaConfig` object with default configuration.
5062
convenience init() {
5163
self.init(
@@ -160,6 +172,11 @@ public struct KafkaConfig: Hashable, Equatable {
160172
self._internal = .init()
161173
}
162174

175+
// TODO: docc new public interface for config
176+
init(clientConfig: any ClientConfig) throws {
177+
self._internal = try .init(clientConfig: clientConfig)
178+
}
179+
163180
/// Retrieve value of configuration property for `key`
164181
public func value(forKey key: String) -> String? {
165182
return self._internal.value(forKey: key)

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,39 @@ public final class KafkaConsumer {
193193
)
194194
}
195195

196+
// MARK: - Initialisers with new config
197+
198+
// TODO: refactor into single initializer for KafkaConsumer and determine assignment/subscription via config
199+
200+
public convenience init(
201+
topics: [String], // TODO: pass topics via config?
202+
config: ConsumerConfig = ConsumerConfig(),
203+
logger: Logger
204+
) throws {
205+
try self.init(
206+
topics: topics,
207+
groupID: config.groupID,
208+
config: KafkaConfig(clientConfig: config),
209+
logger: logger
210+
)
211+
}
212+
213+
public convenience init(
214+
topic: String, // TODO: pass topic via config?
215+
partition: KafkaPartition,
216+
offset: Int64,
217+
config: ConsumerConfig = ConsumerConfig(),
218+
logger: Logger
219+
) throws {
220+
try self.init(
221+
topic: topic,
222+
partition: partition,
223+
offset: offset,
224+
config: KafkaConfig(clientConfig: config),
225+
logger: logger
226+
)
227+
}
228+
196229
/// Subscribe to the given list of `topics`.
197230
/// The partition assignment happens automatically using `KafkaConsumer`'s consumer group.
198231
/// - Parameter topics: An array of topic names to subscribe to.

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,16 @@ public actor KafkaProducer {
136136
}
137137
}
138138

139+
// MARK: - Initialiser with new config
140+
141+
public init(
142+
config: ProducerConfig = ProducerConfig(),
143+
topicConfig: KafkaTopicConfig = KafkaTopicConfig(), // TODO: new topic config
144+
logger: Logger
145+
) async throws {
146+
try await self.init(config: KafkaConfig(clientConfig: config), topicConfig: topicConfig, logger: logger)
147+
}
148+
139149
/// Method to shutdown the ``KafkaProducer``.
140150
///
141151
/// This method flushes any buffered messages and waits until a callback is received for all of them.

0 commit comments

Comments
 (0)