Skip to content

Commit 2c542b0

Browse files
committed
[KIP-1102] Enable clients to rebootstrap based on timeout or error code
1 parent 5bff66d commit 2c542b0

9 files changed

+60
-7
lines changed

Diff for: CONFIGURATION.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ receive.message.max.bytes | * | 1000 .. 2147483647 | 100000
1313
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
1414
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
1515
metadata.recovery.strategy | * | none, rebootstrap | rebootstrap | low | Controls how the client recovers when none of the brokers known to it is available. If set to `none`, the client fails with a fatal error. If set to `rebootstrap`, the client repeats the bootstrap process using `bootstrap.servers` and brokers added through `rd_kafka_brokers_add()`. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. <br>*Type: enum value*
16+
metadata.recovery.rebootstrap.trigger.ms | * | 0 .. 2147483647 | 300000 | low | If a client configured to rebootstrap using `metadata.recovery.strategy=rebootstrap` is unable to obtain metadata from any of the brokers for this interval, client repeats the bootstrap process using `bootstrap.servers` configuration and brokers added through `rd_kafka_brokers_add()`. <br>*Type: integer*
1617
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
1718
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3 <br>*Type: integer*
1819
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 100 | low | When a topic loses its leader a new metadata request will be enqueued immediately and then with this initial interval, exponentially increasing upto `retry.backoff.max.ms`, until the topic metadata has been refreshed. If not set explicitly, it will be defaulted to `retry.backoff.ms`. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*

Diff for: INTRODUCTION.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -2058,6 +2058,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
20582058
| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early Access |
20592059
| KIP-899 - Allow producer and consumer clients to rebootstrap | 3.8.0 | Supported |
20602060
| KIP-951 - Leader discovery optimisations for the client | 3.7.0 | Supported |
2061+
| KIP-1102 - Enable clients to rebootstrap based on timeout or error code | 4.0.0 | Supported |
20612062

20622063

20632064

@@ -2074,7 +2075,7 @@ release of librdkafka.
20742075
| 0 | Produce | 10 | 10 |
20752076
| 1 | Fetch | 16 | 16 |
20762077
| 2 | ListOffsets | 8 | 7 |
2077-
| 3 | Metadata | 12 | 12 |
2078+
| 3 | Metadata | 13 | 13 |
20782079
| 8 | OffsetCommit | 9 | 9 |
20792080
| 9 | OffsetFetch | 9 | 9 |
20802081
| 10 | FindCoordinator | 4 | 2 |

Diff for: src/rdkafka.c

+15
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,9 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
718718
_ERR_DESC(RD_KAFKA_RESP_ERR_TELEMETRY_TOO_LARGE,
719719
"Broker: Client sent a push telemetry request larger than the "
720720
"maximum size the broker will accept"),
721+
_ERR_DESC(RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED,
722+
"Broker: Client metadata is stale, "
723+
"client should rebootstrap to obtain new metadata."),
721724
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
722725

723726

@@ -2862,6 +2865,18 @@ void rd_kafka_rebootstrap(rd_kafka_t *rk) {
28622865
rd_kafka_rebootstrap_tmr_cb, NULL);
28632866
}
28642867

2868+
/**
2869+
* Restarts rebootstrap timer with the configured interval.
2870+
*
2871+
* @locks none
2872+
* @locality any
2873+
*/
2874+
void rd_kafka_rebootstrap_tmr_restart(rd_kafka_t *rk) {
2875+
rd_kafka_timer_start_oneshot(
2876+
&rk->rk_timers, &rk->rebootstrap_tmr, rd_true /*restart*/,
2877+
rk->rk_conf.metadata_recovery_rebootstrap_trigger_ms * 1000LL,
2878+
rd_kafka_rebootstrap_tmr_cb, NULL);
2879+
}
28652880

28662881
/**
28672882
* Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with

Diff for: src/rdkafka.h

+3
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,9 @@ typedef enum {
656656
/** Client sent a push telemetry request larger than the maximum size
657657
* the broker will accept. */
658658
RD_KAFKA_RESP_ERR_TELEMETRY_TOO_LARGE = 118,
659+
/** Client metadata is stale,
660+
* client should rebootstrap to obtain new metadata. */
661+
RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED = 129,
659662
RD_KAFKA_RESP_ERR_END_ALL,
660663
} rd_kafka_resp_err_t;
661664

Diff for: src/rdkafka_conf.c

+11
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,17 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
452452
.s2i = {{RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE, "none"},
453453
{RD_KAFKA_METADATA_RECOVERY_STRATEGY_REBOOTSTRAP, "rebootstrap"},
454454
{0, NULL}}},
455+
{_RK_GLOBAL, "metadata.recovery.rebootstrap.trigger.ms", _RK_C_INT,
456+
_RK(metadata_recovery_rebootstrap_trigger_ms),
457+
"If a client configured to rebootstrap using "
458+
"`metadata.recovery.strategy=rebootstrap` "
459+
"is unable to obtain metadata from any "
460+
"of the brokers for this interval, "
461+
"client repeats the bootstrap process using "
462+
"`bootstrap.servers` configuration "
463+
"and brokers added through "
464+
"`rd_kafka_brokers_add()`.",
465+
0, INT_MAX, 300000},
455466
{_RK_GLOBAL | _RK_DEPRECATED | _RK_HIDDEN, "metadata.request.timeout.ms",
456467
_RK_C_INT, _RK(metadata_request_timeout_ms), "Not used.", 10, 900 * 1000,
457468
10},

Diff for: src/rdkafka_conf.h

+1
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ struct rd_kafka_conf_s {
207207
int msg_copy_max_size;
208208
int recv_max_msg_size;
209209
int max_inflight;
210+
int metadata_recovery_rebootstrap_trigger_ms;
210211
int metadata_request_timeout_ms;
211212
int metadata_refresh_interval_ms;
212213
int metadata_refresh_fast_cnt;

Diff for: src/rdkafka_int.h

+1
Original file line numberDiff line numberDiff line change
@@ -1237,5 +1237,6 @@ rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk,
12371237

12381238
void rd_kafka_rebootstrap(rd_kafka_t *rk);
12391239

1240+
void rd_kafka_rebootstrap_tmr_restart(rd_kafka_t *rk);
12401241

12411242
#endif /* _RDKAFKA_INT_H_ */

Diff for: src/rdkafka_metadata.c

+15-1
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
574574
int broker_changes = 0;
575575
int cache_changes = 0;
576576
int cgrp_subscription_version = -1;
577-
577+
int16_t ErrorCode = 0;
578+
578579
/* If client rack is present, the metadata cache (topic or full) needs
579580
* to contain the partition to rack map. */
580581
rd_bool_t has_client_rack = rk->rk_conf.client_rack &&
@@ -874,6 +875,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
874875
ClusterAuthorizedOperations;
875876
}
876877

878+
if (ApiVersion >= 13) {
879+
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
880+
}
881+
877882
rd_kafka_buf_skip_tags(rkbuf);
878883

879884
/* Entire Metadata response now parsed without errors:
@@ -886,6 +891,14 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
886891
goto err;
887892
}
888893

894+
if (ErrorCode) {
895+
rd_rkb_dbg(rkb, METADATA, "METADATA",
896+
"Received top level error code: %" PRId16,
897+
ErrorCode);
898+
err = ErrorCode;
899+
goto err;
900+
}
901+
889902
/* Update our list of brokers. */
890903
for (i = 0; i < md->broker_cnt; i++) {
891904
rd_rkb_dbg(rkb, METADATA, "METADATA",
@@ -1035,6 +1048,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
10351048
rd_kafka_wrlock(rkb->rkb_rk);
10361049

10371050
rkb->rkb_rk->rk_ts_metadata = rd_clock();
1051+
rd_kafka_rebootstrap_tmr_restart(rkb->rkb_rk);
10381052

10391053
/* Update cached cluster id. */
10401054
if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 &&

Diff for: src/rdkafka_request.c

+11-5
Original file line numberDiff line numberDiff line change
@@ -2612,12 +2612,18 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
26122612
goto done;
26132613

26142614
err:
2615-
actions = rd_kafka_err_action(rkb, err, request,
2615+
actions = rd_kafka_err_action(
2616+
rkb, err, request,
26162617

2617-
RD_KAFKA_ERR_ACTION_RETRY,
2618-
RD_KAFKA_RESP_ERR__PARTIAL,
2618+
RD_KAFKA_ERR_ACTION_SPECIAL, RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED,
26192619

2620-
RD_KAFKA_ERR_ACTION_END);
2620+
RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__PARTIAL,
2621+
2622+
RD_KAFKA_ERR_ACTION_END);
2623+
2624+
if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) {
2625+
rd_kafka_rebootstrap(rk);
2626+
}
26212627

26222628
if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
26232629
/* In case it's a brokers full refresh call,
@@ -2747,7 +2753,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
27472753
int *full_incr = NULL;
27482754
void *handler_arg = NULL;
27492755
rd_kafka_resp_cb_t *handler_cb = rd_kafka_handle_Metadata;
2750-
int16_t metadata_max_version = 12;
2756+
int16_t metadata_max_version = 13;
27512757
rd_kafka_replyq_t use_replyq = replyq;
27522758

27532759
/* In case we want cluster authorized operations in the Metadata

0 commit comments

Comments
 (0)