Skip to content

Commit 6357338

Browse files
committed
in_kafka: increase the poll-timeout if we run in own own thread
having 1ms timeout might make sense if the input plugin is running in the main thread (not introducing delay for others). but if we run in our very own thread then we should not over- ride the fetch.wait.max.ms configuration value from the kafka-consumer. this in conjuntion with using autocommit again boosts the throuhput significantly. Signed-off-by: CoreidCC <[email protected]>
1 parent 8f7c509 commit 6357338

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

plugins/in_kafka/in_kafka.c

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
161161
ret = FLB_EVENT_ENCODER_SUCCESS;
162162

163163
while (ret == FLB_EVENT_ENCODER_SUCCESS) {
164-
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);
164+
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeount_ms);
165165

166166
if (!rkm) {
167167
break;
@@ -246,6 +246,21 @@ static int in_kafka_init(struct flb_input_instance *ins,
246246
goto init_error;
247247
}
248248

249+
/* Set the kafka poll timeout dependend on wether we run in our own
250+
* or in the main event thread.
251+
* a) run in main event thread:
252+
* -> minimize the delay we might create
253+
* b) run in our own thread:
254+
* -> optimize for throuput and relay on 'fetch.wait.max.ms'
255+
* which is set to 500 by default default. lets set it to
256+
* twice that so that increasing fetch.wait.max.ms still
257+
* has an effect.
258+
*/
259+
ctx->poll_timeount_ms = 1;
260+
if(ins->is_threaded) {
261+
ctx->poll_timeount_ms = 1000;
262+
}
263+
249264
if (ctx->buffer_max_size > 0) {
250265
ctx->polling_threshold = ctx->buffer_max_size;
251266

plugins/in_kafka/in_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ struct flb_in_kafka_config {
5050
size_t buffer_max_size; /* Maximum size of chunk allocation */
5151
size_t polling_threshold;
5252
bool enable_auto_commit;
53+
int poll_timeount_ms;
5354
};
5455

5556
#endif

0 commit comments

Comments
 (0)