Skip to content

Commit b0289cc

Browse files
authored
feat(kafka): add ConsumerFactory in order to align future developments (#192)
1 parent fb50904 commit b0289cc

File tree

3 files changed

+21
-4
lines changed

3 files changed

+21
-4
lines changed

kafka/consume.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import (
1414
// ErrSeekTimedOut is the error returned when a consumer timed out during Seek.
1515
var ErrSeekTimedOut = errors.New("Kafka Seek timed out. Please try again.")
1616

17+
// ConsumerFactory creates a Consumer.
18+
// You can use it for postponing the creation of a consumer on runtime. E.g. setting an initial offset that you don't know on boot time.
19+
// NewConsumer and NewDetachedConsumer implements this.
20+
type ConsumerFactory func(logrus.FieldLogger, Config, ...ConfigOpt) (Consumer, error)
21+
1722
// Consumer reads messages from Kafka.
1823
type Consumer interface {
1924
// AssignPartittionByKey sets the current consumer to read from a partion by a hashed key.
@@ -62,7 +67,7 @@ type ConfluentConsumer struct {
6267
// permission on the group coordinator for managing commits, so it needs a consumer group in the broker.
6368
// In order to simplify, the default consumer group id is copied from the configured topic name, so make sure you have a
6469
// policy that gives permission to such consumer group.
65-
func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (*ConfluentConsumer, error) {
70+
func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error) {
6671
// See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
6772
kafkaConf := conf.baseKafkaConfig()
6873
_ = kafkaConf.SetKey("enable.auto.offset.store", false) // manually StoreOffset after processing a message. It is mandatory for detached consumers.
@@ -128,7 +133,7 @@ func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt)
128133
// NewConsumer creates a ConfluentConsumer based on config.
129134
// - NOTE if the partition is set and the partition key is not set in config we have no way
130135
// of knowing where to assign the consumer to in the case of a rebalance
131-
func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (*ConfluentConsumer, error) {
136+
func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error) {
132137
// See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
133138
kafkaConf := conf.baseKafkaConfig()
134139
_ = kafkaConf.SetKey("enable.auto.offset.store", false) // manually StoreOffset after processing a message. Otherwise races may happen.)

kafka/consume_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ func TestConsumerFetchMessageContextAwareness(t *testing.T) {
2222
func TestConsumerSeek(t *testing.T) {
2323
c, conf := consumer(t)
2424
defer checkClose(t, c)
25-
require.NoError(t, c.c.Assign(kafkalib.TopicPartitions{{Topic: &conf.Topic, Partition: 0}})) // manually assign partition
25+
require.NoError(t, c.(*ConfluentConsumer).c.Assign(kafkalib.TopicPartitions{{Topic: &conf.Topic, Partition: 0}})) // manually assign partition
2626
require.NoError(t, c.Seek(2))
2727
}
2828

29-
func consumer(t *testing.T) (*ConfluentConsumer, Config) {
29+
func consumer(t *testing.T) (Consumer, Config) {
3030
conf := Config{
3131
Brokers: nil, // No brokers are used for unit test.
3232
Topic: "gotest",

kafka/kafkatest/utils.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,18 @@ import (
1212
"github.com/sirupsen/logrus"
1313
)
1414

15+
func FakeKafkaConsumerFactory(distri <-chan *kafkalib.Message) kafka.ConsumerFactory {
16+
return func(log logrus.FieldLogger, _ kafka.Config, _ ...kafka.ConfigOpt) (kafka.Consumer, error) {
17+
return NewFakeKafkaConsumer(log, distri), nil
18+
}
19+
}
20+
21+
func KafkaConsumerFactoryFromConsumer(c kafka.Consumer) kafka.ConsumerFactory {
22+
return func(_ logrus.FieldLogger, _ kafka.Config, _ ...kafka.ConfigOpt) (kafka.Consumer, error) {
23+
return c, nil
24+
}
25+
}
26+
1527
func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer) {
1628
distri := make(chan *kafkalib.Message, 200)
1729
rdr := NewFakeKafkaConsumer(log, distri)

0 commit comments

Comments
 (0)