Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit 7a872fd

Browse files
committed
feat(consumer): add batch send deadline
1 parent 0194b1f commit 7a872fd

File tree

5 files changed

+28
-5
lines changed

5 files changed

+28
-5
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ To create new injectors for your topics, you should create a new kubernetes depl
2929
- `ELASTICSEARCH_DISABLE_SNIFFING` if set to "true", the client will not sniff Elasticsearch nodes during the node discovery process. Defaults to false. **OPTIONAL**
3030
- `KAFKA_CONSUMER_CONCURRENCY` Number of parallel goroutines working as a consumer. Default value is 1 **OPTIONAL**
3131
- `KAFKA_CONSUMER_BATCH_SIZE` Number of records to accumulate before sending them to Elasticsearch (for each goroutine). Default value is 100 **OPTIONAL**
32+
- `KAFKA_CONSUMER_BATCH_DEADLINE` If no new records are added to the batch after this time duration, the batch will be sent to Elasticsearch. Default value is 1m **OPTIONAL**
3233
- `ES_INDEX_COLUMN` Record field to append to index name. Ex: to create one ES index per campaign, use "campaign_id" here **OPTIONAL**
3334
- `ES_BLACKLISTED_COLUMNS` Comma separated list of record fields to filter before sending to Elasticsearch. Defaults to empty string. **OPTIONAL**
3435
- `ES_DOC_ID_COLUMN` Record field to be the document ID of Elasticsearch. Defaults to "kafkaRecordPartition:kafkaRecordOffset". **OPTIONAL**

cmd/injector.go

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func main() {
4242
ConsumerGroup: os.Getenv("KAFKA_CONSUMER_GROUP"),
4343
Concurrency: os.Getenv("KAFKA_CONSUMER_CONCURRENCY"),
4444
BatchSize: os.Getenv("KAFKA_CONSUMER_BATCH_SIZE"),
45+
BatchDeadline: os.Getenv("KAFKA_CONSUMER_BATCH_DEADLINE"),
4546
BufferSize: os.Getenv("KAFKA_CONSUMER_BUFFER_SIZE"),
4647
MetricsUpdateInterval: os.Getenv("KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL"),
4748
RecordType: os.Getenv("KAFKA_CONSUMER_RECORD_TYPE"),

src/injector/injector.go

+6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
2222
level.Warn(logger).Log("err", err, "message", "failed to get consumer batch size")
2323
batchSize = 100
2424
}
25+
batchDeadline, err := time.ParseDuration(kafkaConfig.BatchDeadline)
26+
if err != nil {
27+
level.Warn(logger).Log("err", err, "message", "failed to get consumer batch deadline")
28+
batchDeadline = time.Minute
29+
}
2530
metricsUpdateInterval, err := time.ParseDuration(kafkaConfig.MetricsUpdateInterval)
2631
if err != nil {
2732
level.Warn(logger).Log("err", err, "message", "failed to get consumer metrics update interval")
@@ -54,6 +59,7 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
5459
Logger: logger,
5560
Concurrency: concurrency,
5661
BatchSize: batchSize,
62+
BatchDeadline: batchDeadline,
5763
MetricsUpdateInterval: metricsUpdateInterval,
5864
BufferSize: bufferSize,
5965
IncludeKey: includeKey,

src/kafka/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type Config struct {
1010
ConsumerGroup string
1111
Concurrency string
1212
BatchSize string
13+
BatchDeadline string
1314
MetricsUpdateInterval string
1415
BufferSize string
1516
RecordType string

src/kafka/consumer.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package kafka
22

33
import (
44
"context"
5-
"os"
65
"errors"
6+
"os"
77

88
"time"
99

@@ -12,9 +12,9 @@ import (
1212
"github.com/go-kit/kit/endpoint"
1313
"github.com/go-kit/kit/log"
1414
"github.com/go-kit/kit/log/level"
15+
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1516
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
1617
"github.com/inloco/kafka-elasticsearch-injector/src/models"
17-
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1818
)
1919

2020
type Notification int32
@@ -41,6 +41,7 @@ type Consumer struct {
4141
Logger log.Logger
4242
Concurrency int
4343
BatchSize int
44+
BatchDeadline time.Duration
4445
MetricsUpdateInterval time.Duration
4546
BufferSize int
4647
IncludeKey bool
@@ -80,8 +81,9 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan<- Notification)
8081
defer consumer.Close()
8182

8283
buffSize := k.consumer.BatchSize
84+
batchDeadline := k.consumer.BatchDeadline
8385
for i := 0; i < concurrency; i++ {
84-
go k.worker(consumer, buffSize, notifications)
86+
go k.worker(consumer, buffSize, batchDeadline, notifications)
8587
}
8688
go func() {
8789
for {
@@ -134,15 +136,24 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan<- Notification)
134136
}
135137
}
136138

137-
func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications chan<- Notification) {
139+
func batchDeadlineExceeded(lastReceivedMsg *time.Time, batchDeadline time.Duration) bool {
140+
if lastReceivedMsg == nil {
141+
return false
142+
}
143+
144+
return time.Now().Sub(*lastReceivedMsg) > batchDeadline
145+
}
146+
147+
func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, batchDeadline time.Duration, notifications chan<- Notification) {
138148
buf := make([]*sarama.ConsumerMessage, buffSize)
139149
var decoded []*models.Record
150+
var lastReceivedMsg *time.Time
140151
idx := 0
141152
for {
142153
kafkaMsg := <-k.consumerCh
143154
buf[idx] = kafkaMsg
144155
idx++
145-
for idx == buffSize {
156+
if idx == buffSize || batchDeadlineExceeded(lastReceivedMsg, batchDeadline) {
146157
if decoded == nil {
147158
for _, msg := range buf {
148159
req, err := k.consumer.Decoder(nil, msg, k.consumer.IncludeKey)
@@ -172,7 +183,10 @@ func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications c
172183
consumer.MarkOffset(msg, "") // mark message as processed
173184
}
174185
decoded = nil
186+
lastReceivedMsg = nil
175187
idx = 0
188+
} else {
189+
*lastReceivedMsg = time.Now()
176190
}
177191
}
178192
}

0 commit comments

Comments
 (0)