Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit messages by topic, partition, offset #160

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,28 @@ public final class KafkaConsumer: Sendable, Service {
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func scheduleCommit(_ message: KafkaConsumerMessage) throws {
try scheduleCommit(
topic: message.topic,
partition: message.partition,
offset: message.offset)
}

/// Mark all messages up to the passed message in the topic as read.
/// Schedules a commit and returns immediately.
/// Any errors encountered after scheduling the commit will be discarded.
///
/// This method is only used for manual offset management.
///
/// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
///
/// - Parameters:
/// - topic: Topic where the message that should be marked as read resides.
/// - partition: Partition where the message that should be marked as read resides.
/// - offset: Offset of the message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func scheduleCommit(topic: String,
partition: KafkaPartition,
Comment on lines +514 to +515
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we have to expose both a topic and a partition argument here. Topic we should be able to get from our config and the consumer should always stick to the same topic. Partition is a bit more interesting due to rebalancing events.

NIT: Could you also fix the line breaks in this whole commit. We normally line break after the ( and before ). This is matching what Xcode does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A client can be configured to consume from multiple topics and partitions (see assign/1) so I do not think there is a way around passing the topic in explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now a client can only be configured to consume from a single topic since we are not publicly exposing assign. Following this reasoning we shouldn't expose topic here either but take it from the config,

offset: KafkaOffset) throws {
let action = self.stateMachine.withLockedValue { $0.commit() }
switch action {
case .throwClosedError:
Expand All @@ -500,8 +522,10 @@ public final class KafkaConsumer: Sendable, Service {
guard self.configuration.isAutoCommitEnabled == false else {
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
}

try client.scheduleCommit(message)
try client.scheduleCommit(
topic: topic,
partition: partition,
offset: offset)
}
}

Expand All @@ -521,6 +545,26 @@ public final class KafkaConsumer: Sendable, Service {
/// - message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func commit(_ message: KafkaConsumerMessage) async throws {
try await commit(topic: message.topic,
partition: message.partition,
offset: message.offset)
}

/// Mark all messages up to the passed message in the topic as read.
/// Awaits until the commit succeeds or an error is encountered.
///
/// This method is only used for manual offset management.
///
/// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default).
///
/// - Parameters:
/// - topic: Topic where the message that should be marked as read resides.
/// - partition: Partition where the message that should be marked as read resides.
/// - offset: Offset of the message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
public func commit(topic: String,
partition: KafkaPartition,
offset: KafkaOffset) async throws {
let action = self.stateMachine.withLockedValue { $0.commit() }
switch action {
case .throwClosedError:
Expand All @@ -530,10 +574,12 @@ public final class KafkaConsumer: Sendable, Service {
throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false")
}

try await client.commit(message)
try await client.commit(topic: topic,
partition: partition,
offset: offset)
}
}

/// This function is used to gracefully shut down a Kafka consumer client.
///
/// - Note: Invoking this function is not always needed as the ``KafkaConsumer``
Expand Down
66 changes: 38 additions & 28 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -526,41 +526,51 @@ final class RDKafkaClient: Sendable {
}
}

/// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka.
/// Non-blocking "fire-and-forget" commit of a `topic`, `partition`, and `offset` to Kafka.
/// Schedules a commit and returns immediately.
/// Any errors encountered after scheduling the commit will be discarded.
///
/// - Parameter message: Last received message that shall be marked as read.
/// - Parameter topic: Topic to commit to
/// - Parameter partition: Partition to commit to
/// - Parameter offset: Offset to commit
/// - Throws: A ``KafkaError`` if scheduling the commit failed.
func scheduleCommit(_ message: KafkaConsumerMessage) throws {
// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset.rawValue + 1)
)

let error = changesList.withListPointer { listPointer in
return rd_kafka_commit(
self.kafkaHandle,
listPointer,
1 // async = true
func scheduleCommit(
topic: String,
partition: KafkaPartition,
offset: KafkaOffset) throws {
// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: topic,
partition: partition,
offset: Int64(offset.rawValue + 1)
)
}

if error != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: error)
}
let error = changesList.withListPointer { listPointer in
return rd_kafka_commit(
self.kafkaHandle,
listPointer,
1 // async = true
)
}

if error != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: error)
}
}

/// Non-blocking **awaitable** commit of a `message`'s offset to Kafka.
///
/// - Parameter message: Last received message that shall be marked as read.
/// - Parameter topic: Topic to commit to
/// - Parameter partition: Partition to commit to
/// - Parameter offset: Offset to commit
/// - Throws: A ``KafkaError`` if the commit failed.
func commit(_ message: KafkaConsumerMessage) async throws {
func commit(
topic: String,
partition: KafkaPartition,
offset: KafkaOffset) async throws {
// Declare captured closure outside of withCheckedContinuation.
// We do that because do an unretained pass of the captured closure to
// librdkafka which means we have to keep a reference to the closure
Expand All @@ -577,9 +587,9 @@ final class RDKafkaClient: Sendable {
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset.rawValue + 1)
topic: topic,
partition: partition,
offset: Int64(offset.rawValue + 1)
)

// Unretained pass because the reference that librdkafka holds to capturedClosure
Expand All @@ -597,7 +607,7 @@ final class RDKafkaClient: Sendable {
}
}
}

/// Flush any outstanding produce requests.
///
/// - Parameters:
Expand Down