@@ -181,7 +181,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
181
181
rd_kafka_message_destroy (rkm );
182
182
183
183
184
- if (!ctx -> enable_auto_commit ) {
184
+ if (!ctx -> enable_auto_commit ) {
185
185
if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
186
186
rd_kafka_commit (ctx -> kafka .rk , NULL , 0 );
187
187
}
@@ -248,24 +248,23 @@ static int in_kafka_init(struct flb_input_instance *ins,
248
248
goto init_error ;
249
249
}
250
250
251
- /* Set the kafka poll timeout dependend on wether we run in our own
252
- * or in the main event thread.
253
- * a) run in main event thread:
254
- * -> minimize the delay we might create
255
- * b) run in our own thread:
256
- * -> optimize for throuput and relay on 'fetch.wait.max.ms'
257
- * which is set to 500 by default default. wa algin our
258
- * timeout with what is set for 'fetch.wait.max.ms'
259
- */
251
+ /* Set the kafka poll timeout depending on whether we run in our own
252
+ or in the main event thread.
253
+ a) run in main event thread:
254
+ -> minimize the delay we might create
255
+ b) run in our own thread:
256
+ -> optimize for throughput and relay on 'fetch.wait.max.ms'
257
+ which is set to 500 by default. we align our
258
+ timeout with what is set for 'fetch.wait.max.ms' */
260
259
ctx -> poll_timeount_ms = 1 ;
261
260
if (ins -> is_threaded ) {
262
- ctx -> poll_timeount_ms = 550 ; // ensure kafa triggers timeout
261
+ ctx -> poll_timeount_ms = 550 ; /* ensure kafa triggers timeout */
263
262
264
- // align our timeout with what was configured for fetch.wait.max.ms
263
+ /* align our timeout with what was configured for fetch.wait.max.ms */
265
264
dsize = sizeof (conf_val );
266
265
res = rd_kafka_conf_get (kafka_conf , "fetch.wait.max.ms" , conf_val , & dsize );
267
266
if (res == RD_KAFKA_CONF_OK && dsize <= sizeof (conf_val )) {
268
- // add 50ms so kafa triggers timout
267
+ /* add 50ms so kafa triggers timeout */
269
268
ctx -> poll_timeount_ms = atoi (conf_val ) + 50 ;
270
269
}
271
270
}
0 commit comments