Skip to content

Commit f3ca724

Browse files
Improved implementation of KafkaError and KafkaAcknowledgedMessageError (#36)
* Refactored KafkaError Modifications: * made KafkaError public * distinction between rdKafkaError and anyError in KafkaError * Implement more specific errors * Refactored KafkaAcknowledgedMessageError * Added DocC Throws comments + ran swiftformat * Improved structure of KafkaError Modifications: * refactor KafkaError to have a forwared evolvable type that caputures as much information as possible (proposed by FranzBusch in #36) * fixed two broken documentation links * Refactor KafkaAcknowledgedMessageError Modifications: * change structure of KafkaAcknowledgedMessageError so that it only contains a messageID and a wrapped KafkaError * introduced KafkaError.ErrorCode.acknowledgement
1 parent a01b0e2 commit f3ca724

10 files changed

+319
-74
lines changed

Sources/SwiftKafka/KafkaAcknowledgedMessage.swift

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import NIOCore
1818
/// A message produced by the client and acknowledged by the Kafka cluster.
1919
public struct KafkaAcknowledgedMessage: Hashable {
2020
/// The unique identifier assigned by the ``KafkaProducer`` when the message was send to Kafka.
21-
/// The same identifier is returned by ``KafkaProducer/sendAsync(message:)`` and can be used to correlate
21+
/// The same identifier is returned by ``KafkaProducer/sendAsync(_:)`` and can be used to correlate
2222
/// a sent message and an acknowledged message.
2323
public var id: UInt
2424
/// The topic that the message was sent to.
@@ -33,6 +33,7 @@ public struct KafkaAcknowledgedMessage: Hashable {
3333
public var offset: Int64
3434

3535
/// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer.
36+
/// - Throws: A ``KafkaAcknowledgedMessageError`` for failed acknowledgements or malformed messages.
3637
init(messagePointer: UnsafePointer<rd_kafka_message_t>, id: UInt) throws {
3738
self.id = id
3839

@@ -41,15 +42,15 @@ public struct KafkaAcknowledgedMessage: Hashable {
4142
let valueBufferPointer = UnsafeRawBufferPointer(start: rdKafkaMessage.payload, count: rdKafkaMessage.len)
4243
self.value = ByteBuffer(bytes: valueBufferPointer)
4344

44-
guard rdKafkaMessage.err.rawValue == 0 else {
45+
guard rdKafkaMessage.err == RD_KAFKA_RESP_ERR_NO_ERROR else {
4546
var errorStringBuffer = self.value
4647
let errorString = errorStringBuffer.readString(length: errorStringBuffer.readableBytes)
4748

48-
throw KafkaAcknowledgedMessageError(
49-
rawValue: rdKafkaMessage.err.rawValue,
50-
description: errorString,
51-
messageID: self.id
52-
)
49+
if let errorString {
50+
throw KafkaAcknowledgedMessageError.fromMessage(messageID: self.id, message: errorString)
51+
} else {
52+
throw KafkaAcknowledgedMessageError.fromRDKafkaError(messageID: self.id, error: rdKafkaMessage.err)
53+
}
5354
}
5455

5556
guard let topic = String(validatingUTF8: rd_kafka_topic_name(rdKafkaMessage.rkt)) else {

Sources/SwiftKafka/KafkaAcknowledgedMessageError.swift

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,53 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Crdkafka
16+
1517
/// Error caused by the Kafka cluster when trying to process a message produced by ``KafkaProducer``.
16-
public struct KafkaAcknowledgedMessageError: Error {
17-
/// A raw value representing the error code.
18-
public var rawValue: Int32
19-
/// A string describing the error.
20-
public var description: String?
18+
public struct KafkaAcknowledgedMessageError: Error, CustomStringConvertible {
2119
/// Identifier of the message that caused the error.
2220
public var messageID: UInt
21+
/// The underlying ``KafkaError``.
22+
public let error: KafkaError
23+
24+
init(messageID: UInt, error: KafkaError) {
25+
self.messageID = messageID
26+
self.error = error
27+
}
28+
29+
public var description: String {
30+
self.error.description
31+
}
32+
33+
static func fromRDKafkaError(
34+
messageID: UInt,
35+
error: rd_kafka_resp_err_t,
36+
file: String = #fileID,
37+
line: UInt = #line
38+
) -> Self {
39+
.init(
40+
messageID: messageID,
41+
error: .rdKafkaError(
42+
wrapping: error,
43+
file: file,
44+
line: line
45+
)
46+
)
47+
}
48+
49+
static func fromMessage(
50+
messageID: UInt,
51+
message: String,
52+
file: String = #fileID,
53+
line: UInt = #line
54+
) -> Self {
55+
.init(
56+
messageID: messageID,
57+
error: .acknowledgement(
58+
reason: message,
59+
file: file,
60+
line: line
61+
)
62+
)
63+
}
2364
}

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ final class KafkaClient {
5757
rd_kafka_conf_destroy(duplicateConfig)
5858

5959
let errorString = String(cString: errorChars)
60-
throw KafkaError(description: errorString)
60+
throw KafkaError.client(reason: errorString)
6161
}
6262

6363
return handle

Sources/SwiftKafka/KafkaConfig.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public struct KafkaConfig: Hashable, Equatable {
9090

9191
if configResult != RD_KAFKA_CONF_OK {
9292
let errorString = String(cString: errorChars)
93-
throw KafkaError(description: errorString)
93+
throw KafkaError.config(reason: errorString)
9494
}
9595
}
9696

@@ -166,6 +166,7 @@ public struct KafkaConfig: Hashable, Equatable {
166166
}
167167

168168
/// Set configuration `value` for `key`
169+
/// - Throws: A ``KafkaError`` if setting the value failed.
169170
public mutating func set(_ value: String, forKey key: String) throws {
170171
// Copy-on-write mechanism
171172
if !isKnownUniquelyReferenced(&(self._internal)) {

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private struct ConsumerMessagesAsyncSequenceDelegate: NIOAsyncSequenceProducerDe
3434

3535
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
3636
public struct ConsumerMessagesAsyncSequence: AsyncSequence {
37-
public typealias Element = Result<KafkaConsumerMessage, Error> // TODO: replace with something like KafkaConsumerError
37+
public typealias Element = Result<KafkaConsumerMessage, KafkaError>
3838
typealias HighLowWatermark = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark
3939
fileprivate let wrappedSequence: NIOAsyncSequenceProducer<Element, HighLowWatermark, ConsumerMessagesAsyncSequenceDelegate>
4040

@@ -74,7 +74,7 @@ public final class KafkaConsumer {
7474

7575
// We use implicitly unwrapped optionals here as these properties need to access self upon initialization
7676
/// Type of the values returned by the ``messages`` sequence.
77-
private typealias Element = Result<KafkaConsumerMessage, Error> // TODO: replace with a more specific Error type
77+
private typealias Element = Result<KafkaConsumerMessage, KafkaError>
7878
private var messagesSource: NIOAsyncSequenceProducer<
7979
Element,
8080
ConsumerMessagesAsyncSequence.HighLowWatermark,
@@ -88,6 +88,7 @@ public final class KafkaConsumer {
8888
/// or assign the consumer to a particular topic + partition pair using ``assign(topic:partition:offset:)``.
8989
/// - Parameter config: The ``KafkaConfig`` for configuring the ``KafkaConsumer``.
9090
/// - Parameter logger: A logger.
91+
/// - Throws: A ``KafkaError`` if the initialization failed.
9192
private init(
9293
config: KafkaConfig,
9394
logger: Logger
@@ -104,7 +105,7 @@ public final class KafkaConsumer {
104105
rd_kafka_poll_set_consumer(handle)
105106
}
106107
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
107-
throw KafkaError(rawValue: result.rawValue)
108+
throw KafkaError.rdKafkaError(wrapping: result)
108109
}
109110

110111
self.serialQueue = DispatchQueue(label: "swift-kafka-gsoc.consumer.serial")
@@ -136,6 +137,7 @@ public final class KafkaConsumer {
136137
/// - Parameter groupID: Name of the consumer group that this ``KafkaConsumer`` will create / join.
137138
/// - Parameter config: The ``KafkaConfig`` for configuring the ``KafkaConsumer``.
138139
/// - Parameter logger: A logger.
140+
/// - Throws: A ``KafkaError`` if the initialization failed.
139141
public convenience init(
140142
topics: [String],
141143
groupID: String,
@@ -145,7 +147,7 @@ public final class KafkaConsumer {
145147
var config = config
146148
if let configGroupID = config.value(forKey: "group.id") {
147149
if configGroupID != groupID {
148-
throw KafkaError(description: "Group ID does not match with group ID found in the configuration")
150+
throw KafkaError.config(reason: "Group ID does not match with group ID found in the configuration")
149151
}
150152
} else {
151153
try config.set(groupID, forKey: "group.id")
@@ -164,6 +166,7 @@ public final class KafkaConsumer {
164166
/// - Parameter offset: The topic offset where reading begins. Defaults to the offset of the last read message.
165167
/// - Parameter config: The ``KafkaConfig`` for configuring the ``KafkaConsumer``.
166168
/// - Parameter logger: A logger.
169+
/// - Throws: A ``KafkaError`` if the initialization failed.
167170
/// - Note: This consumer ignores the `group.id` property of its `config`.
168171
public convenience init(
169172
topic: String,
@@ -193,6 +196,7 @@ public final class KafkaConsumer {
193196
/// Subscribe to the given list of `topics`.
194197
/// The partition assignment happens automatically using `KafkaConsumer`'s consumer group.
195198
/// - Parameter topics: An array of topic names to subscribe to.
199+
/// - Throws: A ``KafkaError`` if subscribing to the topic list failed.
196200
private func subscribe(topics: [String]) throws {
197201
assert(!closed)
198202

@@ -208,15 +212,16 @@ public final class KafkaConsumer {
208212
rd_kafka_subscribe(handle, subscribedTopicsPointer)
209213
}
210214

211-
guard result.rawValue == 0 else {
212-
throw KafkaError(rawValue: result.rawValue)
215+
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
216+
throw KafkaError.rdKafkaError(wrapping: result)
213217
}
214218
}
215219

216220
/// Assign the``KafkaConsumer`` to a specific `partition` of a `topic`.
217221
/// - Parameter topic: Name of the topic that this ``KafkaConsumer`` will read from.
218222
/// - Parameter partition: Partition that this ``KafkaConsumer`` will read from.
219223
/// - Parameter offset: The topic offset where reading begins. Defaults to the offset of the last read message.
224+
/// - Throws: A ``KafkaError`` if the consumer could not be assigned to the topic + partition pair.
220225
private func assign(
221226
topic: String,
222227
partition: KafkaPartition,
@@ -239,7 +244,7 @@ public final class KafkaConsumer {
239244
}
240245

241246
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
242-
throw KafkaError(rawValue: result.rawValue)
247+
throw KafkaError.rdKafkaError(wrapping: result)
243248
}
244249
}
245250

@@ -257,8 +262,11 @@ public final class KafkaConsumer {
257262
return
258263
}
259264
messageResult = .success(message)
265+
} catch let kafkaError as KafkaError {
266+
messageResult = .failure(kafkaError)
260267
} catch {
261-
messageResult = .failure(error)
268+
self.logger.error("KafkaConsumer caught error: \(error)")
269+
return
262270
}
263271

264272
let yieldresult = self.messagesSource.yield(messageResult)
@@ -275,6 +283,7 @@ public final class KafkaConsumer {
275283
/// This method blocks for a maximum of `timeout` milliseconds.
276284
/// - Parameter timeout: Maximum amount of milliseconds this method waits for a new message.
277285
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
286+
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
278287
private func poll(timeout: Int32 = 100) throws -> KafkaConsumerMessage? {
279288
dispatchPrecondition(condition: .onQueue(self.serialQueue))
280289
assert(!closed)
@@ -307,6 +316,7 @@ public final class KafkaConsumer {
307316
/// Mark `message` in the topic as read and request the next message from the topic.
308317
/// This method is only used for manual offset management.
309318
/// - Parameter message: Last received message that shall be marked as read.
319+
/// - Throws: A ``KafkaError`` if committing failed.
310320
/// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`.
311321
public func commitSync(_ message: KafkaConsumerMessage) async throws {
312322
try await self.serializeWithThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
@@ -323,11 +333,11 @@ public final class KafkaConsumer {
323333
private func _commitSync(_ message: KafkaConsumerMessage) throws {
324334
dispatchPrecondition(condition: .onQueue(self.serialQueue))
325335
guard !self.closed else {
326-
throw KafkaError(description: "Trying to invoke method on consumer that has been closed.")
336+
throw KafkaError.connectionClosed(reason: "Tried to commit message offset on a closed consumer")
327337
}
328338

329339
guard self.config.value(forKey: "enable.auto.commit") == "false" else {
330-
throw KafkaError(description: "Committing manually only works if enable.auto.commit is set to false")
340+
throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false")
331341
}
332342

333343
let changesList = rd_kafka_topic_partition_list_new(1)
@@ -352,7 +362,7 @@ public final class KafkaConsumer {
352362
)
353363
}
354364
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
355-
throw KafkaError(rawValue: result.rawValue)
365+
throw KafkaError.rdKafkaError(wrapping: result)
356366
}
357367
return
358368
}
@@ -371,7 +381,7 @@ public final class KafkaConsumer {
371381
rd_kafka_topic_partition_list_destroy(self.subscribedTopicsPointer)
372382

373383
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
374-
let error = KafkaError(rawValue: result.rawValue)
384+
let error = KafkaError.rdKafkaError(wrapping: result)
375385
self.logger.error("Closing KafkaConsumer failed: \(error.description)")
376386
return
377387
}

Sources/SwiftKafka/KafkaConsumerMessage.swift

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public struct KafkaConsumerMessage: Hashable {
2929
public var offset: Int64
3030

3131
/// Initialize ``KafkaConsumerMessage`` from `rd_kafka_message_t` pointer.
32+
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
3233
init(messagePointer: UnsafePointer<rd_kafka_message_t>) throws {
3334
let rdKafkaMessage = messagePointer.pointee
3435

@@ -42,10 +43,11 @@ public struct KafkaConsumerMessage: Hashable {
4243
var errorStringBuffer = ByteBuffer(bytes: valueBufferPointer)
4344
let errorString = errorStringBuffer.readString(length: errorStringBuffer.readableBytes)
4445

45-
throw KafkaError(
46-
rawValue: rdKafkaMessage.err.rawValue,
47-
description: errorString ?? ""
48-
)
46+
if let errorString {
47+
throw KafkaError.messageConsumption(reason: errorString)
48+
} else {
49+
throw KafkaError.rdKafkaError(wrapping: rdKafkaMessage.err)
50+
}
4951
}
5052

5153
guard let topic = String(validatingUTF8: rd_kafka_topic_name(rdKafkaMessage.rkt)) else {

0 commit comments

Comments
 (0)