Skip to content

Commit 5065315

Browse files
author
Ómar Kjartan Yasin
committed
Commit messages by topic, partition, offset
Motivation: In some cases, for instances when messages are handed out to other processes, it is not reasonable to hold on to the whole Kafka message to be able to commit it at a later time. It consumes much less memory to hold only onto the details needed to commit the message. Modifications: Add new public methods that allow users to commit messages if they have a reference to the topic, partition, and offset instead of only the whole message. Result: Developers no longer need to keep the whole message around to commit it, only the topic, partition, and offset.
1 parent 51c5f72 commit 5065315

File tree

2 files changed

+86
-31
lines changed

2 files changed

+86
-31
lines changed

Sources/Kafka/KafkaConsumer.swift

+48-3
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,27 @@ public final class KafkaConsumer: Sendable, Service {
492492
/// - message: Last received message that shall be marked as read.
493493
/// - Throws: A ``KafkaError`` if committing failed.
494494
public func scheduleCommit(_ message: KafkaConsumerMessage) throws {
495+
try scheduleCommit(topic: message.topic,
496+
partition: message.partition,
497+
offset: message.offset)
498+
}
499+
500+
/// Mark all messages up to the passed message in the topic as read.
501+
/// Schedules a commit and returns immediately.
502+
/// Any errors encountered after scheduling the commit will be discarded.
503+
///
504+
/// This method is only used for manual offset management.
505+
///
506+
/// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
507+
///
508+
/// - Parameters:
509+
/// - topic: Topic where the message that should be marked as read resides.
510+
/// - partition: Partition where the message that should be marked as read resides.
511+
/// - offset: Offset of the message that shall be marked as read.
512+
/// - Throws: A ``KafkaError`` if committing failed.
513+
public func scheduleCommit(topic: String,
514+
partition: KafkaPartition,
515+
offset: KafkaOffset) throws {
495516
let action = self.stateMachine.withLockedValue { $0.commit() }
496517
switch action {
497518
case .throwClosedError:
@@ -501,7 +522,9 @@ public final class KafkaConsumer: Sendable, Service {
501522
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
502523
}
503524

504-
try client.scheduleCommit(message)
525+
try client.scheduleCommit(topic: topic,
526+
partition: partition,
527+
offset: offset)
505528
}
506529
}
507530

@@ -521,6 +544,26 @@ public final class KafkaConsumer: Sendable, Service {
521544
/// - message: Last received message that shall be marked as read.
522545
/// - Throws: A ``KafkaError`` if committing failed.
523546
public func commit(_ message: KafkaConsumerMessage) async throws {
547+
try await commit(topic: message.topic,
548+
partition: message.partition,
549+
offset: message.offset)
550+
}
551+
552+
/// Mark all messages up to the passed message in the topic as read.
553+
/// Awaits until the commit succeeds or an error is encountered.
554+
///
555+
/// This method is only used for manual offset management.
556+
///
557+
/// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
558+
///
559+
/// - Parameters:
560+
/// - topic: Topic where the message that should be marked as read resides.
561+
/// - partition: Partition where the message that should be marked as read resides.
562+
/// - offset: Offset of the message that shall be marked as read.
563+
/// - Throws: A ``KafkaError`` if committing failed.
564+
public func commit(topic: String,
565+
partition: KafkaPartition,
566+
offset: KafkaOffset) async throws {
524567
let action = self.stateMachine.withLockedValue { $0.commit() }
525568
switch action {
526569
case .throwClosedError:
@@ -530,10 +573,12 @@ public final class KafkaConsumer: Sendable, Service {
530573
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
531574
}
532575

533-
try await client.commit(message)
576+
try await client.commit(topic: topic,
577+
partition: partition,
578+
offset: offset)
534579
}
535580
}
536-
581+
537582
/// This function is used to gracefully shut down a Kafka consumer client.
538583
///
539584
/// - Note: Invoking this function is not always needed as the ``KafkaConsumer``

Sources/Kafka/RDKafka/RDKafkaClient.swift

+38-28
Original file line numberDiff line numberDiff line change
@@ -526,41 +526,51 @@ final class RDKafkaClient: Sendable {
526526
}
527527
}
528528

529-
/// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka.
529+
/// Non-blocking "fire-and-forget" commit of a `topic`, `partition`, and `offset` to Kafka.
530530
/// Schedules a commit and returns immediately.
531531
/// Any errors encountered after scheduling the commit will be discarded.
532532
///
533-
/// - Parameter message: Last received message that shall be marked as read.
533+
/// - Parameter topic: Topic to commit to
534+
/// - Parameter partition: Partition to commit to
535+
/// - Parameter offset: Offset to commit
534536
/// - Throws: A ``KafkaError`` if scheduling the commit failed.
535-
func scheduleCommit(_ message: KafkaConsumerMessage) throws {
536-
// The offset committed is always the offset of the next requested message.
537-
// Thus, we increase the offset of the current message by one before committing it.
538-
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
539-
let changesList = RDKafkaTopicPartitionList()
540-
changesList.setOffset(
541-
topic: message.topic,
542-
partition: message.partition,
543-
offset: Int64(message.offset.rawValue + 1)
544-
)
545-
546-
let error = changesList.withListPointer { listPointer in
547-
return rd_kafka_commit(
548-
self.kafkaHandle,
549-
listPointer,
550-
1 // async = true
537+
func scheduleCommit(
538+
topic: String,
539+
partition: KafkaPartition,
540+
offset: KafkaOffset) throws {
541+
// The offset committed is always the offset of the next requested message.
542+
// Thus, we increase the offset of the current message by one before committing it.
543+
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
544+
let changesList = RDKafkaTopicPartitionList()
545+
changesList.setOffset(
546+
topic: topic,
547+
partition: partition,
548+
offset: Int64(offset.rawValue + 1)
551549
)
552-
}
553550

554-
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
555-
throw KafkaError.rdKafkaError(wrapping: error)
556-
}
551+
let error = changesList.withListPointer { listPointer in
552+
return rd_kafka_commit(
553+
self.kafkaHandle,
554+
listPointer,
555+
1 // async = true
556+
)
557+
}
558+
559+
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
560+
throw KafkaError.rdKafkaError(wrapping: error)
561+
}
557562
}
558563

559564
/// Non-blocking **awaitable** commit of a `message`'s offset to Kafka.
560565
///
561-
/// - Parameter message: Last received message that shall be marked as read.
566+
/// - Parameter topic: Topic to commit to
567+
/// - Parameter partition: Partition to commit to
568+
/// - Parameter offset: Offset to commit
562569
/// - Throws: A ``KafkaError`` if the commit failed.
563-
func commit(_ message: KafkaConsumerMessage) async throws {
570+
func commit(
571+
topic: String,
572+
partition: KafkaPartition,
573+
offset: KafkaOffset) async throws {
564574
// Declare captured closure outside of withCheckedContinuation.
565575
// We do that because do an unretained pass of the captured closure to
566576
// librdkafka which means we have to keep a reference to the closure
@@ -577,9 +587,9 @@ final class RDKafkaClient: Sendable {
577587
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
578588
let changesList = RDKafkaTopicPartitionList()
579589
changesList.setOffset(
580-
topic: message.topic,
581-
partition: message.partition,
582-
offset: Int64(message.offset.rawValue + 1)
590+
topic: topic,
591+
partition: partition,
592+
offset: Int64(offset.rawValue + 1)
583593
)
584594

585595
// Unretained pass because the reference that librdkafka holds to capturedClosure
@@ -597,7 +607,7 @@ final class RDKafkaClient: Sendable {
597607
}
598608
}
599609
}
600-
610+
601611
/// Flush any outstanding produce requests.
602612
///
603613
/// - Parameters:

0 commit comments

Comments
 (0)