Skip to content

Commit d090a31

Browse files
authored
feat(kafka) add detached consumer (#178)
1 parent 101e3b3 commit d090a31

File tree

1 file changed

+73
-4
lines changed

1 file changed

+73
-4
lines changed

kafka/consume.go

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,75 @@ type ConfluentConsumer struct {
5656
eventChan chan kafkalib.Event
5757
}
5858

59+
// NewDetachedConsumer creates a Consumer detached from Consumer Groups for partition assignment and rebalance (see NOTE).
60+
// - NOTE Either a partition or partition key is required to be set.
61+
// A detached consumer will work out of consumer groups for partition assignment and rebalance, however it needs
62+
// permission on the group coordinator for managing commits, so it needs a consumer group in the broker.
63+
// In order to simplify, the default consumer group id is copied from the configured topic name, so make sure you have a
64+
// policy that gives permission to such consumer group.
65+
func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error) {
66+
// See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
67+
kafkaConf := conf.baseKafkaConfig()
68+
_ = kafkaConf.SetKey("enable.auto.offset.store", false) // manually StoreOffset after processing a message. It is mandatory for detached consumers.
69+
70+
// In case we try to assign an offset out of range (greater than log-end-offset), consumer will use start consuming from offset zero.
71+
_ = kafkaConf.SetKey("auto.offset.reset", "earliest")
72+
73+
conf.Consumer.GroupID = conf.Topic // Defaults to topic name. See NOTE above)
74+
75+
conf.Consumer.Apply(kafkaConf)
76+
for _, opt := range opts {
77+
opt(kafkaConf)
78+
}
79+
80+
if err := conf.configureAuth(kafkaConf); err != nil {
81+
return nil, errors.Wrap(err, "error configuring auth for the Kafka consumer")
82+
}
83+
84+
consumer, err := kafkalib.NewConsumer(kafkaConf)
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
if conf.RequestTimeout == 0 {
90+
conf.RequestTimeout = DefaultTimeout
91+
}
92+
93+
cc := &ConfluentConsumer{
94+
c: consumer,
95+
conf: conf,
96+
log: log,
97+
}
98+
99+
logFields := logrus.Fields{"kafka_topic": cc.conf.Topic}
100+
101+
if cc.conf.Consumer.Partition == nil && cc.conf.Consumer.PartitionKey == "" {
102+
return nil, errors.New("Either a partition or a partition key is required for creating a detached consumer")
103+
}
104+
105+
logFields["kafka_partition_key"] = cc.conf.Consumer.PartitionKey
106+
logFields["kafka_partition"] = cc.conf.Consumer.Partition
107+
108+
if cc.conf.Consumer.Partition != nil {
109+
cc.log.WithFields(logFields).Debug("Assigning specified partition")
110+
pt := []kafkalib.TopicPartition{
111+
{
112+
Topic: &cc.conf.Topic,
113+
Partition: *cc.conf.Consumer.Partition,
114+
},
115+
}
116+
return cc, cc.c.Assign(pt)
117+
}
118+
119+
if cc.conf.Consumer.PartitionerAlgorithm == "" {
120+
cc.conf.Consumer.PartitionerAlgorithm = PartitionerMurMur2
121+
}
122+
123+
cc.log.WithFields(logFields).Debug("Assigning partition by partition key")
124+
125+
return cc, cc.AssignPartitionByKey(cc.conf.Consumer.PartitionKey, cc.conf.Consumer.PartitionerAlgorithm)
126+
}
127+
59128
// NewConsumer creates a ConfluentConsumer based on config.
60129
// - NOTE if the partition is set and the partition key is not set in config we have no way
61130
// of knowing where to assign the consumer to in the case of a rebalance
@@ -107,7 +176,7 @@ func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consum
107176
logFields["kafka_partition"] = cc.conf.Consumer.Partition
108177
}
109178

110-
cc.setupRebalanceHandler(cc.conf.Consumer.InitialOffset)
179+
cc.setupRebalanceHandler()
111180
cc.log.WithFields(logFields).Debug("Subscribing to Kafka topic")
112181
if serr := cc.c.Subscribe(cc.conf.Topic, cc.rebalanceHandler); serr != nil {
113182
err = errors.Wrap(serr, "error subscribing to topic")
@@ -155,7 +224,7 @@ func (cc *ConfluentConsumer) SeekToTime(t time.Time) error {
155224
}
156225

157226
// setupReabalnceHandler does the setup of the rebalance handler
158-
func (cc *ConfluentConsumer) setupRebalanceHandler(offset *int64) {
227+
func (cc *ConfluentConsumer) setupRebalanceHandler() {
159228
cc.rebalanceHandlerMutex.Lock()
160229
defer cc.rebalanceHandlerMutex.Unlock()
161230

@@ -176,8 +245,8 @@ func (cc *ConfluentConsumer) setupRebalanceHandler(offset *int64) {
176245
// if we have an initial offset we need to set it
177246
if cc.conf.Consumer.InitialOffset != nil {
178247
once.Do(func() {
179-
log.WithField("kafka_offset", *offset).Debug("Skipping Kafka assignment given by coordinator after rebalance in favor of resetting the offset")
180-
partitions = kafkalib.TopicPartitions{{Topic: &cc.conf.Topic, Offset: kafkalib.Offset(*offset)}}
248+
log.WithField("kafka_offset", *cc.conf.Consumer.InitialOffset).Debug("Skipping Kafka assignment given by coordinator after rebalance in favor of resetting the offset")
249+
partitions = kafkalib.TopicPartitions{{Topic: &cc.conf.Topic, Offset: kafkalib.Offset(*cc.conf.Consumer.InitialOffset)}}
181250
})
182251
}
183252
log.WithField("kafka_partitions", partitions).Debug("Assigning Kafka partitions after rebalance")

0 commit comments

Comments
 (0)