Skip to content

Commit 30fe1bf

Browse files
committed
Review Franz
Motivation: We don't want to expose source and poll closure from `KafkaPollingSystem`. Modifications: * add new `func initialize(backPressureStrategy:, pollClosure:)` to `KafkaPollingSystem` to support delayed initialization (needed because the `AsyncSequence` relies on `KafkaPollingSystem` and `KafkaPollingSystem` relies on the `AsyncSequence.Source` (circular dependency) * `KafkaPollingSystem.StateMachine`: add state `.uninitialized` for when the `StateMachine` exists but `initialize()` has not yet been invoked on it * stop exposing `pollClosure` and `source` variables from `KafkaPollingSystem` / `KafkaPollingSystem.StateMachine` * move `yield(_:)` state functionality inside of `KafkaPollingSystem.StateMachine` * make `RDKafkaConfig.convertMessageToAcknowledgementResult` `private` * add new error type: `KafkaError.pollLoop` * throw error when invoking `KafkaPollingSystem.run()` on closed poll loop
1 parent 797e35a commit 30fe1bf

File tree

5 files changed

+297
-297
lines changed

5 files changed

+297
-297
lines changed

Sources/SwiftKafka/KafkaError.swift

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,16 @@ public struct KafkaError: Error, Hashable, CustomStringConvertible {
101101
)
102102
}
103103

104+
static func pollLoop(
105+
reason: String, file: String = #fileID, line: UInt = #line
106+
) -> KafkaError {
107+
return KafkaError(
108+
backing: .init(
109+
code: .pollLoop, reason: reason, file: file, line: line
110+
)
111+
)
112+
}
113+
104114
static func connectionClosed(
105115
reason: String, file: String = #fileID, line: UInt = #line
106116
) -> KafkaError {
@@ -155,6 +165,7 @@ extension KafkaError {
155165
case config
156166
case topicConfig
157167
case connectionClosed
168+
case pollLoop
158169
case client
159170
case messageConsumption
160171
case topicCreation
@@ -177,6 +188,8 @@ extension KafkaError {
177188
public static let topicConfig = ErrorCode(.topicConfig)
178189
/// Something or somebody tried to access a client that ended its connection to Kafka.
179190
public static let connectionClosed = ErrorCode(.connectionClosed)
191+
/// An error occured inside the poll loop.
192+
public static let pollLoop = ErrorCode(.pollLoop)
180193
/// Establishing a connection to Kafka failed.
181194
public static let client = ErrorCode(.client)
182195
/// Consuming a message failed.

0 commit comments

Comments
 (0)