Skip to content

Strongly-typed Configurations #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 14, 2023
Merged
34 changes: 34 additions & 0 deletions Sources/SwiftKafka/KafkaConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@ public struct KafkaConfig: Hashable, Equatable {
self.opaque = opaque
}

/// Initialize internal `KafkaConfig` object from a ``ProducerConfig`` provided by the new API.
convenience init(producerConfig: ProducerConfig) throws {
self.init(
pointer: rd_kafka_conf_new(),
opaque: nil
)

try producerConfig.dictionary.forEach { key, value in
try self.set(value, forKey: key)
}
}

/// Initialize internal `KafkaConfig` object from a ``ConsumerConfig`` provided by the new API.
convenience init(consumerConfig: ConsumerConfig) throws {
self.init(
pointer: rd_kafka_conf_new(),
opaque: nil
)

try consumerConfig.dictionary.forEach { key, value in
try self.set(value, forKey: key)
}
}

/// Initialize internal `KafkaConfig` object with default configuration.
convenience init() {
self.init(
Expand Down Expand Up @@ -160,6 +184,16 @@ public struct KafkaConfig: Hashable, Equatable {
self._internal = .init()
}

/// Initialize a legacy ``KafkaConfig`` from a ``ProducerConfig`` provided by the new API.
init(producerConfig: ProducerConfig) throws {
self._internal = try .init(producerConfig: producerConfig)
}

/// Initialize a legacy ``KafkaConfig`` from a ``ConsumerConfig`` provided by the new API.
init(consumerConfig: ConsumerConfig) throws {
self._internal = try .init(consumerConfig: consumerConfig)
}

/// Retrieve value of configuration property for `key`
public func value(forKey key: String) -> String? {
return self._internal.value(forKey: key)
Expand Down
31 changes: 31 additions & 0 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,37 @@ public final class KafkaConsumer {
)
}

// MARK: - Initialisers with new config

public convenience init(
topics: [String],
config: ConsumerConfig = ConsumerConfig(),
logger: Logger
) throws {
try self.init(
topics: topics,
groupID: config.groupID,
config: KafkaConfig(consumerConfig: config),
logger: logger
)
}

public convenience init(
topic: String,
partition: KafkaPartition,
offset: Int64,
config: ConsumerConfig = ConsumerConfig(),
logger: Logger
) throws {
try self.init(
topic: topic,
partition: partition,
offset: offset,
config: KafkaConfig(consumerConfig: config),
logger: logger
)
}

/// Subscribe to the given list of `topics`.
/// The partition assignment happens automatically using `KafkaConsumer`'s consumer group.
/// - Parameter topics: An array of topic names to subscribe to.
Expand Down
14 changes: 14 additions & 0 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ public actor KafkaProducer {
}
}

// MARK: - Initialiser with new config

public init(
config: ProducerConfig = ProducerConfig(),
topicConfig: TopicConfig = TopicConfig(),
logger: Logger
) async throws {
try await self.init(
config: KafkaConfig(producerConfig: config),
topicConfig: KafkaTopicConfig(topicConfig: topicConfig),
logger: logger
)
}

/// Method to shutdown the ``KafkaProducer``.
///
/// This method flushes any buffered messages and waits until a callback is received for all of them.
Expand Down
14 changes: 14 additions & 0 deletions Sources/SwiftKafka/KafkaTopicConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ public struct KafkaTopicConfig: Hashable, Equatable {
self.pointer = pointer
}

/// Initialize internal `KafkaTopicConfig` object from a ``TopicConfig`` provided by the new API.
convenience init(topicConfig: TopicConfig) throws {
self.init()

try topicConfig.dictionary.forEach { key, value in
try self.set(value, forKey: key)
}
}

deinit {
rd_kafka_topic_conf_destroy(pointer)
}
Expand Down Expand Up @@ -101,6 +110,11 @@ public struct KafkaTopicConfig: Hashable, Equatable {
self._internal = .init()
}

/// Initialize a legacy ``KafkaTopicConfig`` from a ``TopicConfig`` provided by the new API.
init(topicConfig: TopicConfig) throws {
self._internal = try .init(topicConfig: topicConfig)
}

/// Retrieve value of topic configuration property for `key`
public func value(forKey key: String) -> String? {
return self._internal.value(forKey: key)
Expand Down
81 changes: 81 additions & 0 deletions Sources/SwiftKafka/New/ConfigEnums.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-gsoc open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// Collection of `enum` types used in the configuration structs this library provides.
public struct ConfigEnums {
/// Available debug contexts to enable.
public struct DebugOption: Hashable, Equatable, CustomStringConvertible {
public let description: String

public static let generic = DebugOption(description: "generic")
public static let broker = DebugOption(description: "broker")
public static let topic = DebugOption(description: "topic")
public static let metadata = DebugOption(description: "metadata")
public static let feature = DebugOption(description: "feature")
public static let queue = DebugOption(description: "queue")
public static let msg = DebugOption(description: "msg")
public static let `protocol` = DebugOption(description: "protocol")
public static let cgrp = DebugOption(description: "cgrp")
public static let security = DebugOption(description: "security")
public static let fetch = DebugOption(description: "fetch")
public static let interceptor = DebugOption(description: "interceptor")
public static let plugin = DebugOption(description: "plugin")
public static let consumer = DebugOption(description: "consumer")
public static let admin = DebugOption(description: "admin")
public static let eos = DebugOption(description: "eos")
public static let all = DebugOption(description: "all")
}

/// Available IP address families.
public struct IPAddressFamily: Hashable, Equatable, CustomStringConvertible {
public let description: String

/// Use any IP address family.
public static let any = IPAddressFamily(description: "any")
/// Use the IPv4 address family.
public static let v4 = IPAddressFamily(description: "v4")
/// Use the IPv6 address family.
public static let v6 = IPAddressFamily(description: "v6")
}

/// Protocol used to communicate with brokers.
public struct SecurityProtocol: Hashable, Equatable, CustomStringConvertible {
public let description: String

/// Send messages as plaintext (no security protocol used).
public static let plaintext = SecurityProtocol(description: "plaintext")
/// Use the Secure Sockets Layer (SSL) protocol.
public static let ssl = SecurityProtocol(description: "ssl")
/// Use the Simple Authentication and Security Layer (SASL).
public static let saslPlaintext = SecurityProtocol(description: "sasl_plaintext")
/// Use the Simple Authentication and Security Layer (SASL) with SSL.
public static let saslSSL = SecurityProtocol(description: "sasl_ssl")
}

/// Available SASL mechanisms that can be used for authentication.
public struct SASLMechanism: Hashable, Equatable, CustomStringConvertible {
public let description: String

/// Use the GSSAPI mechanism.
public static let gssapi = SASLMechanism(description: "GSSAPI")
/// Use the PLAIN mechanism.
public static let plain = SASLMechanism(description: "PLAIN")
/// Use the SCRAM-SHA-256 mechanism.
public static let scramSHA256 = SASLMechanism(description: "SCRAM-SHA-256")
/// Use the SCRAM-SHA-512 mechanism.
public static let scramSHA512 = SASLMechanism(description: "SCRAM-SHA-512")
/// Use the OAUTHBEARER mechanism.
public static let oauthbearer = SASLMechanism(description: "OAUTHBEARER")
}
}
Loading