Skip to content

Commit 351625f

Browse files
committed
in_kafka: make pull timeout configurable
1 parent 0aa05f1 commit 351625f

File tree

2 files changed

+17
-8
lines changed

2 files changed

+17
-8
lines changed

plugins/in_kafka/in_kafka.c

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,9 @@ static int in_kafka_collect(struct flb_input_instance *ins,
182182

183183

184184
if(!ctx->enable_auto_commit) {
185-
/* TO-DO: commit the record based on `ret` */
186-
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
185+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
186+
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
187+
}
187188
}
188189

189190
/* Break from the loop when reaching the limit of polling if available */
@@ -225,6 +226,7 @@ static int in_kafka_init(struct flb_input_instance *ins,
225226
char errstr[512];
226227
(void) data;
227228
char conf_val[16];
229+
size_t dsize;
228230

229231
/* Allocate space for the configuration context */
230232
ctx = flb_malloc(sizeof(struct flb_in_kafka_config));
@@ -252,13 +254,20 @@ static int in_kafka_init(struct flb_input_instance *ins,
252254
* -> minimize the delay we might create
253255
* b) run in our own thread:
254256
* -> optimize for throuput and relay on 'fetch.wait.max.ms'
255-
* which is set to 500 by default default. lets set it to
256-
* twice that so that increasing fetch.wait.max.ms still
257-
* has an effect.
257+
* which is set to 500 by default default. wa algin our
258+
* timeout with what is set for 'fetch.wait.max.ms'
258259
*/
259260
ctx->poll_timeount_ms = 1;
260-
if(ins->is_threaded) {
261-
ctx->poll_timeount_ms = 1000;
261+
if (ins->is_threaded) {
262+
ctx->poll_timeount_ms = 550; // ensure kafa triggers timeout
263+
264+
// align our timeout with what was configured for fetch.wait.max.ms
265+
dsize = sizeof(conf_val);
266+
res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize);
267+
if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) {
268+
// add 50ms so kafa triggers timout
269+
ctx->poll_timeount_ms = atoi(conf_val) + 50;
270+
}
262271
}
263272

264273
if (ctx->buffer_max_size > 0) {
@@ -451,7 +460,6 @@ static struct flb_config_map config_map[] = {
451460
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit),
452461
"Rely on kafka auto-commit and commit messages in batches"
453462
},
454-
/* EOF */
455463
{0}
456464
};
457465

plugins/in_kafka/in_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#define FLB_IN_KAFKA_UNLIMITED (size_t)-1
3434
#define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M"
3535
#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false"
36+
#define FLB_IN_KAFKA_POLL_TIMEOUT_MS "550" // same as kafka fetch.wait.max.ms + 10%
3637

3738
enum {
3839
FLB_IN_KAFKA_FORMAT_NONE,

0 commit comments

Comments
 (0)