Skip to content

Commit eda7068

Browse files
committed
[KIP-848] Fix static group membership
there are three cases: - member explicitly unsubscribes -> should cause a rebalance - member closes the consumer -> no rebalance - member destroys the consumer -> no rebalance When a new member joins the group and there's already a member with same group instance id that hasn't left statically the new member is fenced with the new protocol
1 parent 9387761 commit eda7068

6 files changed

+170
-63
lines changed

src/rdkafka_cgrp.c

+7-5
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,8 @@ rd_kafka_cgrp_handle_ConsumerGroupHeartbeat_leave(rd_kafka_t *rk,
981981
goto err;
982982
}
983983

984-
static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) {
984+
static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg,
985+
rd_bool_t static_leave) {
985986
int32_t member_epoch = -1;
986987

987988
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
@@ -999,7 +1000,7 @@ static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) {
9991000
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
10001001

10011002
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;
1002-
if (RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg)) {
1003+
if (static_leave && RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg)) {
10031004
member_epoch = -2;
10041005
}
10051006

@@ -1078,7 +1079,8 @@ static rd_bool_t rd_kafka_cgrp_leave_maybe(rd_kafka_cgrp_t *rkcg) {
10781079
return rd_false;
10791080

10801081
if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
1081-
rd_kafka_cgrp_consumer_leave(rkcg);
1082+
rd_kafka_cgrp_consumer_leave(
1083+
rkcg, rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE);
10821084
} else {
10831085
/* KIP-345: Static group members must not send a
10841086
* LeaveGroupRequest on termination. */
@@ -1386,7 +1388,6 @@ static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) {
13861388
rd_kafka_cgrp_leave_maybe(rkcg);
13871389
}
13881390

1389-
rd_kafka_cgrp_consumer_reset(rkcg);
13901391
rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
13911392
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg, "rejoining");
13921393
}
@@ -3096,6 +3097,7 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
30963097
"ConsumerGroupHeartbeat failed due to: %s: "
30973098
"will rejoin the group",
30983099
rd_kafka_err2str(err));
3100+
rd_kafka_cgrp_consumer_reset(rkcg);
30993101
rkcg->rkcg_consumer_flags |=
31003102
RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN;
31013103
return;
@@ -4963,7 +4965,7 @@ rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts,
49634965
1 /*lock*/);
49644966

49654967
if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
4966-
rd_kafka_cgrp_consumer_leave(rkcg);
4968+
rd_kafka_cgrp_consumer_leave(rkcg, rd_false);
49674969
rkcg->rkcg_consumer_flags |=
49684970
RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN;
49694971
rd_kafka_cgrp_consumer_expedite_next_heartbeat(

src/rdkafka_mock.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -3036,7 +3036,8 @@ static int ut_cgrp_consumer_member_next_assignment0(
30363036
fixtures[i].comment);
30373037

30383038
if (fixtures[i].session_timed_out) {
3039-
rd_kafka_mock_cgrp_consumer_member_leave(mcgrp, member);
3039+
rd_kafka_mock_cgrp_consumer_member_leave(mcgrp, member,
3040+
rd_false);
30403041
member = rd_kafka_mock_cgrp_consumer_member_add(
30413042
mcgrp, conn, &MemberId, &InstanceId,
30423043
&SubscribedTopic, 1);

src/rdkafka_mock_cgrp.c

+34-16
Original file line numberDiff line numberDiff line change
@@ -1462,18 +1462,26 @@ rd_kafka_mock_cgrp_consumer_member_add(rd_kafka_mock_cgrp_consumer_t *mcgrp,
14621462
/* Find member */
14631463
member = rd_kafka_mock_cgrp_consumer_member_find(mcgrp, MemberId);
14641464
if (!member) {
1465+
if (RD_KAFKAP_STR_LEN(MemberId) == 0)
1466+
/* KIP 1082: MemberId is generated by the client */
1467+
return NULL;
1468+
14651469
member = rd_kafka_mock_cgrp_consumer_member_find_by_instance_id(
14661470
mcgrp, InstanceId);
1467-
if (member && RD_KAFKAP_STR_LEN(MemberId) > 0 &&
1471+
1472+
if (member &&
14681473
rd_kafkap_str_cmp_str(MemberId, member->id) != 0) {
1469-
/* Either member is a new instance and is rejoining
1470-
* with same InstanceId, so MemberId is NULL,
1471-
* or it's rejoining after unsubscribing,
1472-
* then it must have the same MemberId as before,
1473-
* as it lasts for member lifetime.
1474-
* It both don't hold, we cannot add the member
1475-
* to the group. */
1476-
return NULL;
1474+
/* Member is a new instance and is rejoining
1475+
* with a new MemberId. */
1476+
1477+
if (!member->left_static_membership)
1478+
/* Old member still active,
1479+
* fence this one */
1480+
return NULL;
1481+
1482+
RD_IF_FREE(member->id, rd_free);
1483+
member->id = RD_KAFKAP_STR_DUP(MemberId);
1484+
member->left_static_membership = rd_false;
14771485
}
14781486
}
14791487

@@ -1555,29 +1563,39 @@ static void rd_kafka_mock_cgrp_consumer_member_destroy(
15551563
rd_free(member);
15561564
}
15571565

1566+
static void rd_kafka_mock_cgrp_consumer_member_leave_static(
1567+
rd_kafka_mock_cgrp_consumer_member_t *member) {
1568+
member->left_static_membership = rd_true;
1569+
rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(member,
1570+
NULL);
1571+
}
1572+
15581573

15591574
/**
15601575
* @brief Called when a member must leave a consumer group.
15611576
*
15621577
* @param mcgrp Consumer group to leave.
15631578
* @param member Member that leaves.
1564-
* @param is_static If true, the member is leaving with static group membership.
1579+
* @param leave_static If true, the member is leaving with static group
1580+
* membership.
15651581
*
15661582
* @locks mcluster->lock MUST be held.
15671583
*/
15681584
void rd_kafka_mock_cgrp_consumer_member_leave(
15691585
rd_kafka_mock_cgrp_consumer_t *mcgrp,
1570-
rd_kafka_mock_cgrp_consumer_member_t *member) {
1586+
rd_kafka_mock_cgrp_consumer_member_t *member,
1587+
rd_bool_t leave_static) {
15711588
rd_bool_t is_static = member->instance_id != NULL;
15721589

15731590
rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
1574-
"Member %s is leaving group %s, is static: %s", member->id,
1575-
mcgrp->id, RD_STR_ToF(is_static));
1576-
if (!is_static)
1591+
"Member %s is leaving group %s, is static: %s, "
1592+
"static leave: %s",
1593+
member->id, mcgrp->id, RD_STR_ToF(is_static),
1594+
RD_STR_ToF(leave_static));
1595+
if (!is_static || !leave_static)
15771596
rd_kafka_mock_cgrp_consumer_member_destroy(mcgrp, member);
15781597
else
1579-
rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(
1580-
member, NULL);
1598+
rd_kafka_mock_cgrp_consumer_member_leave_static(member);
15811599
}
15821600

15831601
/**

src/rdkafka_mock_handlers.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -2884,7 +2884,7 @@ rd_kafka_mock_handle_ConsumerGroupHeartbeat(rd_kafka_mock_connection_t *mconn,
28842884
}
28852885
} else {
28862886
rd_kafka_mock_cgrp_consumer_member_leave(
2887-
mcgrp, member);
2887+
mcgrp, member, MemberEpoch == -2);
28882888
member = NULL;
28892889
}
28902890
} else {

src/rdkafka_mock_int.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ typedef struct rd_kafka_mock_cgrp_consumer_member_s {
166166

167167
rd_list_t *subscribed_topics; /**< Subscribed topics */
168168

169+
rd_bool_t left_static_membership; /**< Member has left the group
170+
* with static membership. */
171+
169172
struct rd_kafka_mock_connection_s *conn; /**< Connection, may be NULL
170173
* if there is no ongoing
171174
* request. */
@@ -671,7 +674,8 @@ rd_kafka_mock_cgrp_consumer_get(rd_kafka_mock_cluster_t *mcluster,
671674

672675
void rd_kafka_mock_cgrp_consumer_member_leave(
673676
rd_kafka_mock_cgrp_consumer_t *mcgrp,
674-
rd_kafka_mock_cgrp_consumer_member_t *member);
677+
rd_kafka_mock_cgrp_consumer_member_t *member,
678+
rd_bool_t static_leave);
675679

676680
void rd_kafka_mock_cgrp_consumer_member_fenced(
677681
rd_kafka_mock_cgrp_consumer_t *mcgrp,

0 commit comments

Comments
 (0)