Skip to content

Commit c1a004d

Browse files
authored
feat: allow for asynchronous kafka offset commits (#236)
1 parent 052504f commit c1a004d

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

kafka/consume.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ type Consumer interface {
3636
// CommitMessage commits the offset of a given message.
3737
CommitMessage(msg *kafkalib.Message) error
3838

39+
// StoreOffset stores the offset of a given message. The offset will be asynchronously flushed to kafka every
40+
// `auto.commit.interval.ms`. This method is non-blocking and will be faster than `CommitMessage`, however it has
41+
// weaker delivery guarantees.
42+
StoreOffset(msg *kafkalib.Message) error
43+
3944
// GetMetadata gets the metadata for a consumer.
4045
GetMetadata(allTopics bool) (*kafkalib.Metadata, error)
4146

@@ -337,6 +342,20 @@ func (cc *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error {
337342
return errors.Wrap(err, "failed committing Kafka message")
338343
}
339344

345+
// StoreOffset stores the offset of a given message. The offset will be asynchronously flushed to kafka every
346+
// `auto.commit.interval.ms`. This method is non-blocking and will be faster than `CommitMessage`, however it has
347+
// weaker delivery guarantees.
348+
func (cc *ConfluentConsumer) StoreOffset(msg *kafkalib.Message) error {
349+
if msg.TopicPartition.Error != nil {
350+
return errors.New("can't commit errored message")
351+
}
352+
353+
offsets := []kafkalib.TopicPartition{msg.TopicPartition}
354+
offsets[0].Offset++
355+
_, err := cc.c.StoreOffsets(offsets)
356+
return err
357+
}
358+
340359
// Pause pauses consumption of the provided partitions
341360
func (cc *ConfluentConsumer) Pause(p []kafkalib.TopicPartition) error {
342361
err := cc.c.Pause(p)

kafka/kafkatest/utils.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ func (f *FakeKafkaConsumer) CommitMessage(msg *kafkalib.Message) error {
121121
return nil
122122
}
123123

124+
func (f *FakeKafkaConsumer) StoreOffset(msg *kafkalib.Message) error {
125+
return f.CommitMessage(msg)
126+
}
127+
124128
func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error {
125129
f.msgMu.Lock()
126130
f.offset = offset

0 commit comments

Comments
 (0)