Skip to content

Commit 052504f

Browse files
author
Kelsey
authored
feat: Expose Pause and Resume through kafka.Consumer interface (#230)
1 parent 0f9f120 commit 052504f

File tree

3 files changed

+29
-0
lines changed

3 files changed

+29
-0
lines changed

kafka/consume.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ type Consumer interface {
4747

4848
// SeekToTime seeks to the specified time.
4949
SeekToTime(t time.Time) error
50+
51+
// Pause pauses consumption of the provided partitions
52+
Pause(p []kafkalib.TopicPartition) error
53+
54+
// Resume resumes consumption of the provided partitions
55+
Resume(p []kafkalib.TopicPartition) error
5056
}
5157

5258
// ConfluentConsumer implements Consumer interface.
@@ -331,6 +337,18 @@ func (cc *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error {
331337
return errors.Wrap(err, "failed committing Kafka message")
332338
}
333339

340+
// Pause pauses consumption of the provided partitions
341+
func (cc *ConfluentConsumer) Pause(p []kafkalib.TopicPartition) error {
342+
err := cc.c.Pause(p)
343+
return errors.Wrap(err, "failed to pause Kafka topic")
344+
}
345+
346+
// Resume resumes consumption of the provided partitions
347+
func (cc *ConfluentConsumer) Resume(p []kafkalib.TopicPartition) error {
348+
err := cc.c.Resume(p)
349+
return errors.Wrap(err, "failed to resume Kafka topic")
350+
}
351+
334352
// Close closes the consumer.
335353
func (cc *ConfluentConsumer) Close() error {
336354
return cc.c.Close()

kafka/integration_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ func TestIntegration(t *testing.T) {
9292
assert.Equal([]byte(k), m.Key, "Partition to read from: %d, Msg: %+v", p, m)
9393
assert.Equal([]byte(v), m.Value, "Partition to read from: %d, Msg: %+v", p, m)
9494

95+
assert.NoError(c.Pause([]kafkalib.TopicPartition{m.TopicPartition}))
96+
assert.NoError(c.Resume([]kafkalib.TopicPartition{m.TopicPartition}))
97+
9598
assert.NoError(c.CommitMessage(m))
9699
}
97100

kafka/kafkatest/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ func (f *FakeKafkaConsumer) SeekToTime(t time.Time) error {
156156
return nil // noop
157157
}
158158

159+
func (f *FakeKafkaConsumer) Pause(p []kafkalib.TopicPartition) error {
160+
return nil // noop
161+
}
162+
163+
func (f *FakeKafkaConsumer) Resume(p []kafkalib.TopicPartition) error {
164+
return nil // noop
165+
}
166+
159167
func (f *FakeKafkaConsumer) Close() error {
160168
close(f.commits)
161169
return nil

0 commit comments

Comments
 (0)