Skip to content

Conversation

@omarkj
Copy link
Contributor

@omarkj omarkj commented Jan 3, 2024

Motivation:

In some cases, for instances when messages are handed out to other processes, it is not reasonable to hold on to the whole Kafka message to be able to commit it at a later time. It consumes much less memory to hold only onto the details needed to commit the message.

Modifications:

Add new public methods that allow users to commit messages if they have a reference to the topic, partition, and offset instead of only the whole message.

Result:

Developers no longer need to keep the whole message around to commit it, only the topic, partition, and offset.

Comment on lines +513 to +515
public func scheduleCommit(topic: String,
partition: KafkaPartition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we have to expose both a topic and a partition argument here. Topic we should be able to get from our config and the consumer should always stick to the same topic. Partition is a bit more interesting due to rebalancing events.

NIT: Could you also fix the line breaks in this whole commit. We normally line break after the ( and before ). This is matching what Xcode does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A client can be configured to consume from multiple topics and partitions (see assign/1) so I do not think there is a way around passing the topic in explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now a client can only be configured to consume from a single topic since we are not publicly exposing assign. Following this reasoning we shouldn't expose topic here either but take it from the config,

Motivation:

In some cases, for instances when messages are handed out
to other processes, it is not reasonable to hold on to the
whole Kafka message to be able to commit it at a later time.
It consumes much less memory to hold only onto the details
needed to commit the message.

Modifications:

Add new public methods that allow users to commit messages
if they have a reference to the topic, partition, and offset
instead of only the whole message.

Result:

Developers no longer need to keep the whole message around to
commit it, only the topic, partition, and offset.
@omarkj omarkj requested a review from FranzBusch January 16, 2024 18:45
Comment on lines +513 to +515
public func scheduleCommit(topic: String,
partition: KafkaPartition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now a client can only be configured to consume from a single topic since we are not publicly exposing assign. Following this reasoning we shouldn't expose topic here either but take it from the config,

@blindspotbounty
Copy link
Collaborator

@omarkj thank you for your PR! There are some notes.

Agree with @FranzBusch that indeed currently assign (.partition) does not allow to accept several topics.
Though it doesn't lack sense to have topic as argument as there is other consumption strategy: .group that allows to subscribe to multiple topics by either explicitly specifying them, either subscribing to wildcard .group(id: "<group id>", topics: ["topic1", "topic2", "^some_prefixed_topic_wildcard.*"]).

However, I would probably organise it as some dedicated struct, i.e. TopicPartitionOffset. That have couple of advantages now and for future use:

  1. That would allow to copy this info in one short from consumer message
  2. Maybe then migrate .partition consumption strategy to the same structure in future
  3. The same structure can be used for rebalance implementation in future

For example:

struct TopicPartitionOffset: Hashable, Equitable? {
    public let topic: String
    public let partition: KafkaPartition
    public let offset: KafkaOffset
    /// no public constructor (at least so far)
}

struct KafkaConsumerMessage {
     var topicPartitionOffset: TopicPartitionOffset { .init(topic: self.topic, partition: self.partition, offset: self.offset) }
}

final class KafkaConsumer {
...
    func commit(_ topicPartitionOffset) throws
}

The usage would be the following:

for try await msg in consumer.messages {
    consumer.commit(msg.topicPartitionOffset) // here or after some time consuming processing now only need to save topicPartitionOffset
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants