Skip to content

Commit f5bfbe2

Browse files
add state for initalized transactions
1 parent b42b3f9 commit f5bfbe2

File tree

2 files changed

+43
-16
lines changed

2 files changed

+43
-16
lines changed

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,7 @@ public final class KafkaProducer: Service, Sendable {
271271
throw KafkaError.config(
272272
reason: "Could not initialize transactions because transactionalId is not set in config")
273273
}
274-
// FIXME: maybe add state 'startedWithTransactions'?
275-
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
274+
let client = try self.stateMachine.withLockedValue { try $0.initTransactions() }
276275
try await client.initTransactions(timeout: timeout)
277276
}
278277

@@ -329,6 +328,18 @@ extension KafkaProducer {
329328
source: Producer.Source?,
330329
topicHandles: RDKafkaTopicHandles
331330
)
331+
/// The ``KafkaProducer`` has started and is ready to use, transactions were initialized.
332+
///
333+
/// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages.
334+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
335+
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
336+
/// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
337+
case startedWithTransactions(
338+
client: RDKafkaClient,
339+
messageIDCounter: UInt,
340+
source: Producer.Source?,
341+
topicHandles: RDKafkaTopicHandles
342+
)
332343
/// Producer is still running but the acknowledgement asynchronous sequence was terminated.
333344
/// All incoming acknowledgements will be dropped.
334345
///
@@ -396,7 +407,7 @@ extension KafkaProducer {
396407
switch self.state {
397408
case .uninitialized:
398409
fatalError("\(#function) invoked while still in state \(self.state)")
399-
case .started(let client, _, let source, _):
410+
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
400411
return .pollAndYield(client: client, source: source)
401412
case .consumptionStopped(let client):
402413
return .pollWithoutYield(client: client)
@@ -439,6 +450,19 @@ extension KafkaProducer {
439450
newMessageID: newMessageID,
440451
topicHandles: topicHandles
441452
)
453+
case .startedWithTransactions(let client, let messageIDCounter, let source, let topicHandles):
454+
let newMessageID = messageIDCounter + 1
455+
self.state = .startedWithTransactions(
456+
client: client,
457+
messageIDCounter: newMessageID,
458+
source: source,
459+
topicHandles: topicHandles
460+
)
461+
return .send(
462+
client: client,
463+
newMessageID: newMessageID,
464+
topicHandles: topicHandles
465+
)
442466
case .consumptionStopped:
443467
throw KafkaError.connectionClosed(reason: "Sequence consuming acknowledgements was abruptly terminated, producer closed")
444468
case .finishing:
@@ -464,7 +488,7 @@ extension KafkaProducer {
464488
fatalError("\(#function) invoked while still in state \(self.state)")
465489
case .consumptionStopped:
466490
fatalError("messageSequenceTerminated() must not be invoked more than once")
467-
case .started(let client, _, let source, _):
491+
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
468492
self.state = .consumptionStopped(client: client)
469493
return .finishSource(source: source)
470494
case .finishing(let client, let source):
@@ -484,7 +508,7 @@ extension KafkaProducer {
484508
switch self.state {
485509
case .uninitialized:
486510
fatalError("\(#function) invoked while still in state \(self.state)")
487-
case .started(let client, _, let source, _):
511+
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
488512
self.state = .finishing(client: client, source: source)
489513
case .consumptionStopped(let client):
490514
self.state = .finishing(client: client, source: nil)
@@ -493,21 +517,25 @@ extension KafkaProducer {
493517
}
494518
}
495519

496-
// TODO:
497-
// 1. add client()
498-
// 2. initTransactions() -> change state to startedWithTransactions
499-
// 3. transactionsClient() -> return client only for startedWithTransactions
500-
501-
502-
func transactionsClient() throws -> RDKafkaClient {
520+
mutating func initTransactions() throws -> RDKafkaClient {
503521
switch self.state {
504522
case .uninitialized:
505523
fatalError("\(#function) invoked while still in state \(self.state)")
506-
case .started(let client, _, _, _):
524+
case .started(let client, let messageIDCounter, let source, let topicHandles):
525+
self.state = .startedWithTransactions(client: client, messageIDCounter: messageIDCounter, source: source, topicHandles: topicHandles)
507526
return client
508-
default:
527+
case .startedWithTransactions:
528+
throw KafkaError.config(reason: "Transactions were already initialized")
529+
case .consumptionStopped, .finishing, .finished:
509530
throw KafkaError.connectionClosed(reason: "Producer is stopping or finished")
510531
}
511532
}
533+
534+
func transactionsClient() throws -> RDKafkaClient {
535+
guard case let .startedWithTransactions(client, _, _, _) = self.state else {
536+
throw KafkaError.transactionAborted(reason: "Transactions were not initialized or producer is being stopped")
537+
}
538+
return client
539+
}
512540
}
513541
}

Sources/SwiftKafka/RDKafka/RDKafkaClient.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ final class RDKafkaClient: Sendable {
474474

475475
func initTransactions(timeout: Duration) async throws {
476476
let result = await forBlockingFunc {
477-
rd_kafka_init_transactions(self.kafkaHandle, Int32(timeout.totalMilliseconds))
477+
rd_kafka_init_transactions(self.kafkaHandle, timeout.totalMilliseconds)
478478
}
479479

480480
if result != nil {
@@ -592,7 +592,6 @@ final class RDKafkaClient: Sendable {
592592
}
593593
}
594594

595-
// TODO: tmp, should be in other PRs
596595
extension Duration {
597596
// Internal usage only: librdkafka accepts Int32 as timeouts
598597
var totalMilliseconds: Int32 {

0 commit comments

Comments
 (0)