Skip to content

Commit b42b3f9

Browse files
add approx. transactional api
1 parent 5b07fe2 commit b42b3f9

File tree

7 files changed

+279
-2
lines changed

7 files changed

+279
-2
lines changed

Package.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ let package = Package(
4444
.package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"),
4545
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0-alpha.1"),
4646
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
47+
.package(url: "https://github.com/ordo-one/package-concurrency-helpers", .upToNextMajor(from: "1.0.0")),
4748
// The zstd Swift package produces warnings that we cannot resolve:
4849
// https://github.com/facebook/zstd/issues/3328
4950
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
@@ -73,6 +74,7 @@ let package = Package(
7374
name: "SwiftKafka",
7475
dependencies: [
7576
"Crdkafka",
77+
.product(name: "ConcurrencyHelpers", package: "package-concurrency-helpers", moduleAliases: ["ConcurrencyHelpers" : "BlockingCallWrapper"]),
7678
.product(name: "NIOCore", package: "swift-nio"),
7779
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
7880
.product(name: "Logging", package: "swift-log"),

Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public struct KafkaProducerConfiguration {
3636
/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
3737
/// Default: `false`
3838
public var enableIdempotence: Bool = false
39+
40+
public var transactionalId: String?
3941

4042
/// Producer queue options.
4143
public var queue: KafkaConfiguration.QueueOptions = .init()
@@ -108,6 +110,9 @@ extension KafkaProducerConfiguration {
108110
var resultDict: [String: String] = [:]
109111

110112
resultDict["enable.idempotence"] = String(self.enableIdempotence)
113+
if let transactionalId {
114+
resultDict["transactional.id"] = transactionalId
115+
}
111116
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
112117
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
113118
resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,10 @@ public final class KafkaConsumer: Sendable, Service {
301301
}
302302
}
303303
}
304+
305+
func client() throws -> RDKafkaClient {
306+
return try stateMachine.withLockedValue { try $0.client() }
307+
}
304308
}
305309

306310
// MARK: - KafkaConsumer + StateMachine
@@ -548,5 +552,22 @@ extension KafkaConsumer {
548552
return nil
549553
}
550554
}
555+
556+
func client() throws -> RDKafkaClient {
557+
switch self.state {
558+
case .uninitialized:
559+
fatalError("\(#function) invoked while still in state \(self.state)")
560+
case .initializing(let client, _):
561+
return client
562+
case .consuming(let client, _):
563+
return client
564+
case .consumptionStopped(let client):
565+
return client
566+
case .finishing(let client):
567+
return client
568+
case .finished:
569+
throw KafkaError.client(reason: "Client is stopped")
570+
}
571+
}
551572
}
552573
}

Sources/SwiftKafka/KafkaError.swift

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public struct KafkaError: Error, CustomStringConvertible {
5454
}
5555

5656
static func rdKafkaError(
57-
wrapping error: rd_kafka_resp_err_t, file: String = #fileID, line: UInt = #line
57+
wrapping error: rd_kafka_resp_err_t, isFatal: Bool = false, file: String = #fileID, line: UInt = #line
5858
) -> KafkaError {
5959
let errorMessage = String(cString: rd_kafka_err2str(error))
6060
return KafkaError(
@@ -143,6 +143,36 @@ public struct KafkaError: Error, CustomStringConvertible {
143143
)
144144
)
145145
}
146+
147+
static func transactionAborted(
148+
reason: String, file: String = #fileID, line: UInt = #line
149+
) -> KafkaError {
150+
return KafkaError(
151+
backing: .init(
152+
code: .transactionAborted, reason: reason, file: file, line: line
153+
)
154+
)
155+
}
156+
157+
static func transactionIncomplete(
158+
reason: String, file: String = #fileID, line: UInt = #line
159+
) -> KafkaError {
160+
return KafkaError(
161+
backing: .init(
162+
code: .transactionIncomplete, reason: reason, file: file, line: line
163+
)
164+
)
165+
}
166+
167+
static func transactionOutOfAttempts(
168+
numOfAttempts: UInt64, file: String = #fileID, line: UInt = #line
169+
) -> KafkaError {
170+
return KafkaError(
171+
backing: .init(
172+
code: .transactionOutOfAttempts, reason: "Out of \(numOfAttempts) attempts", file: file, line: line
173+
)
174+
)
175+
}
146176
}
147177

148178
extension KafkaError {
@@ -162,6 +192,10 @@ extension KafkaError {
162192
case messageConsumption
163193
case topicCreation
164194
case topicDeletion
195+
case transactionAborted
196+
case transactionIncomplete
197+
case notInTransaction // FIXME: maybe add subcode ?
198+
case transactionOutOfAttempts
165199
}
166200

167201
fileprivate var backingCode: BackingCode
@@ -188,6 +222,12 @@ extension KafkaError {
188222
public static let topicCreation = ErrorCode(.topicCreation)
189223
/// Deleting a topic failed.
190224
public static let topicDeletion = ErrorCode(.topicDeletion)
225+
/// Transaction was aborted (can be re-tried from scratch).
226+
public static let transactionAborted = ErrorCode(.transactionAborted)
227+
/// Transaction could not be completed
228+
public static let transactionIncomplete = ErrorCode(.transactionIncomplete)
229+
/// Out of provided number of attempts
230+
public static let transactionOutOfAttempts = ErrorCode(.transactionOutOfAttempts)
191231

192232
public var description: String {
193233
return String(describing: self.backingCode)
@@ -206,17 +246,21 @@ extension KafkaError {
206246
let file: String
207247

208248
let line: UInt
249+
250+
let isFatal: Bool
209251

210252
fileprivate init(
211253
code: KafkaError.ErrorCode,
212254
reason: String,
213255
file: String,
214-
line: UInt
256+
line: UInt,
257+
isFatal: Bool = false
215258
) {
216259
self.code = code
217260
self.reason = reason
218261
self.file = file
219262
self.line = line
263+
self.isFatal = isFatal
220264
}
221265

222266
// Only the error code matters for equality.

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,43 @@ public final class KafkaProducer: Service, Sendable {
265265
return KafkaProducerMessageID(rawValue: newMessageID)
266266
}
267267
}
268+
269+
func initTransactions(timeout: Duration = .seconds(5)) async throws {
270+
guard config.transactionalId != nil else {
271+
throw KafkaError.config(
272+
reason: "Could not initialize transactions because transactionalId is not set in config")
273+
}
274+
// FIXME: maybe add state 'startedWithTransactions'?
275+
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
276+
try await client.initTransactions(timeout: timeout)
277+
}
278+
279+
func beginTransaction() throws {
280+
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
281+
try client.beginTransaction()
282+
}
283+
284+
func send(
285+
offsets: RDKafkaTopicPartitionList,
286+
forConsumer consumer: KafkaConsumer,
287+
timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
288+
attempts: UInt64 = .max
289+
) async throws {
290+
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
291+
let consumerClient = try consumer.client()
292+
try await consumerClient.withKafkaHandlePointer {
293+
try await client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
294+
}
295+
}
296+
297+
func abortTransaction(
298+
timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
299+
attempts: UInt64) async throws {
300+
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
301+
try await client.abortTransaction(attempts: attempts, timeout: timeout)
302+
}
303+
304+
268305
}
269306

270307
// MARK: - KafkaProducer + StateMachine
@@ -455,5 +492,22 @@ extension KafkaProducer {
455492
break
456493
}
457494
}
495+
496+
// TODO:
497+
// 1. add client()
498+
// 2. initTransactions() -> change state to startedWithTransactions
499+
// 3. transactionsClient() -> return client only for startedWithTransactions
500+
501+
502+
func transactionsClient() throws -> RDKafkaClient {
503+
switch self.state {
504+
case .uninitialized:
505+
fatalError("\(#function) invoked while still in state \(self.state)")
506+
case .started(let client, _, _, _):
507+
return client
508+
default:
509+
throw KafkaError.connectionClosed(reason: "Producer is stopping or finished")
510+
}
511+
}
458512
}
459513
}

Sources/SwiftKafka/RDKafka/RDKafkaClient.swift

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Crdkafka
16+
import BlockingCallWrapper
1617
import Dispatch
1718
import Logging
1819

@@ -436,6 +437,14 @@ final class RDKafkaClient: Sendable {
436437
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
437438
return try body(self.kafkaHandle)
438439
}
440+
441+
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle with async closure.
442+
/// - Warning: Do not escape the pointer from the closure for later use.
443+
/// - Parameter body: The closure will use the Kafka handle pointer.
444+
@discardableResult
445+
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) async throws -> T) async rethrows -> T {
446+
return try await body(self.kafkaHandle)
447+
}
439448

440449
/// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.
441450
/// - Parameter messagePointer: An `UnsafePointer` pointing to the `rd_kafka_message_t` object in memory.
@@ -462,4 +471,138 @@ final class RDKafkaClient: Sendable {
462471

463472
return messageResult
464473
}
474+
475+
func initTransactions(timeout: Duration) async throws {
476+
let result = await forBlockingFunc {
477+
rd_kafka_init_transactions(self.kafkaHandle, Int32(timeout.totalMilliseconds))
478+
}
479+
480+
if result != nil {
481+
let code = rd_kafka_error_code(result)
482+
rd_kafka_error_destroy(result)
483+
throw KafkaError.rdKafkaError(wrapping: code)
484+
}
485+
}
486+
487+
func beginTransaction() throws {
488+
let result = rd_kafka_begin_transaction(kafkaHandle)
489+
if result != nil {
490+
let code = rd_kafka_error_code(result)
491+
rd_kafka_error_destroy(result)
492+
throw KafkaError.rdKafkaError(wrapping: code)
493+
}
494+
}
495+
496+
func send(
497+
attempts: UInt64,
498+
offsets: RDKafkaTopicPartitionList,
499+
forConsumerKafkaHandle consumer: OpaquePointer,
500+
timeout: Duration) async throws {
501+
try await offsets.withListPointer { topicPartitionList in
502+
503+
let consumerMetadata = rd_kafka_consumer_group_metadata(consumer)
504+
defer { rd_kafka_consumer_group_metadata_destroy(consumerMetadata) }
505+
506+
// TODO: actually it should be withing some timeout (like transaction timeout or session timeout)
507+
for idx in 0..<attempts {
508+
let error = await forBlockingFunc {
509+
rd_kafka_send_offsets_to_transaction(self.kafkaHandle, topicPartitionList,
510+
consumerMetadata, timeout.totalMillisecondsOrMinusOne)
511+
}
512+
513+
/* check if offset commit is completed successfully */
514+
if error == nil {
515+
return
516+
}
517+
defer { rd_kafka_error_destroy(error) }
518+
519+
/* check if offset commit is retriable */
520+
if rd_kafka_error_is_retriable(error) == 1 {
521+
continue
522+
}
523+
524+
/* check if transaction need to be aborted */
525+
if rd_kafka_error_txn_requires_abort(error) == 1 {
526+
do {
527+
try await abortTransaction(attempts: attempts - idx, timeout: timeout)
528+
throw KafkaError.transactionAborted(reason: "Transaction aborted and can be started from scratch")
529+
} catch {
530+
throw KafkaError.transactionIncomplete(
531+
reason: "Could not complete or abort transaction with error \(error)")
532+
}
533+
}
534+
let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
535+
throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
536+
}
537+
}
538+
throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
539+
}
540+
541+
func abortTransaction(attempts: UInt64, timeout: Duration) async throws {
542+
for _ in 0..<attempts {
543+
let error = await forBlockingFunc {
544+
rd_kafka_abort_transaction(self.kafkaHandle, timeout.totalMillisecondsOrMinusOne)
545+
}
546+
/* check if transaction abort is completed successfully */
547+
if error == nil {
548+
return
549+
}
550+
defer { rd_kafka_error_destroy(error) }
551+
552+
/* check if transaction abort is retriable */
553+
if rd_kafka_error_is_retriable(error) == 1 {
554+
continue
555+
}
556+
let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
557+
throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
558+
}
559+
throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
560+
}
561+
562+
func commitTransaction(attempts: UInt64, timeout: Duration) async throws {
563+
for idx in 0..<attempts {
564+
let error = await forBlockingFunc {
565+
rd_kafka_commit_transaction(self.kafkaHandle, timeout.totalMillisecondsOrMinusOne)
566+
}
567+
/* check if transaction is completed successfully */
568+
if error == nil {
569+
return
570+
}
571+
/* check if transaction is retriable */
572+
if rd_kafka_error_is_retriable(error) == 1 {
573+
continue
574+
}
575+
defer { rd_kafka_error_destroy(error) }
576+
577+
/* check if transaction need to be aborted */
578+
if rd_kafka_error_txn_requires_abort(error) == 1 {
579+
do {
580+
try await abortTransaction(attempts: attempts - idx, timeout: timeout)
581+
throw KafkaError.transactionAborted(reason: "Transaction aborted and can be started from scratch")
582+
} catch {
583+
throw KafkaError.transactionIncomplete(
584+
reason: "Could not complete or abort transaction with error \(error)")
585+
}
586+
}
587+
/* check if error is fatal */
588+
let isFatal = (rd_kafka_error_is_fatal(error) == 1) // fatal when Producer/Consumer must be restarted
589+
throw KafkaError.rdKafkaError(wrapping: rd_kafka_error_code(error), isFatal: isFatal)
590+
}
591+
throw KafkaError.transactionOutOfAttempts(numOfAttempts: attempts)
592+
}
593+
}
594+
595+
// TODO: tmp, should be in other PRs
596+
extension Duration {
597+
// Internal usage only: librdkafka accepts Int32 as timeouts
598+
var totalMilliseconds: Int32 {
599+
return Int32(self.components.seconds * 1000 + self.components.attoseconds / 1_000_000_000_000_000)
600+
}
601+
602+
var totalMillisecondsOrMinusOne: Int32 {
603+
return max(totalMilliseconds, -1)
604+
}
605+
606+
public static var kafkaUntilEndOfTransactionTimeout: Duration = .milliseconds(-1)
607+
public static var kafkaNoWaitTransaction: Duration = .zero
465608
}

0 commit comments

Comments
 (0)