Skip to content

Commit 9da516b

Browse files
KAFKA-18392: Ensure client sets member ID for share group (apache#18649)
Reviewers: Apoorv Mittal <[email protected]>, Lianet Magrans <[email protected]>
1 parent 239708f commit 9da516b

File tree

3 files changed

+30
-29
lines changed

3 files changed

+30
-29
lines changed

Diff for: clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
2929
"about": "The group identifier." },
3030
{ "name": "MemberId", "type": "string", "versions": "0+",
31-
"about": "The member id." },
31+
"about": "The member id generated by the consumer. The member id must be kept during the entire lifetime of the consumer process." },
3232
{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
3333
"about": "The current member epoch; 0 to join the group; -1 to leave the group." },
3434
{ "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",

Diff for: group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

+28-27
Original file line numberDiff line numberDiff line change
@@ -728,8 +728,7 @@ ConsumerGroup consumerGroup(
728728
* created if it does not exist.
729729
*
730730
* @return A ConsumerGroup.
731-
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
732-
* if the group is not a consumer group.
731+
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false.
733732
* @throws IllegalStateException if the group does not have the expected type.
734733
* Package private for testing.
735734
*/
@@ -846,28 +845,28 @@ private ShareGroup getOrMaybeCreateShareGroup(
846845

847846
if (group == null) {
848847
return new ShareGroup(snapshotRegistry, groupId);
848+
} else {
849+
if (group.type() == SHARE) {
850+
return (ShareGroup) group;
851+
} else {
852+
// We don't support upgrading/downgrading between protocols at the moment so
853+
// we throw an exception if a group exists with the wrong type.
854+
throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", groupId));
855+
}
849856
}
850-
851-
if (group.type() != SHARE) {
852-
// We don't support upgrading/downgrading between protocols at the moment so
853-
// we throw an exception if a group exists with the wrong type.
854-
throw new GroupIdNotFoundException(String.format("Group %s is not a share group.",
855-
groupId));
856-
}
857-
858-
return (ShareGroup) group;
859857
}
860858

861859
/**
862-
* Gets or maybe creates a share group.
860+
* The method should be called on the replay path.
861+
* Gets or maybe creates a share group and updates the groups map if a new group is created.
863862
*
864863
* @param groupId The group id.
865864
* @param createIfNotExists A boolean indicating whether the group should be
866865
* created if it does not exist.
867866
*
868867
* @return A ShareGroup.
869-
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
870-
* if the group is not a consumer group.
868+
* @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false.
869+
* @throws IllegalStateException if the group does not have the expected type.
871870
*
872871
* Package private for testing.
873872
*/
@@ -878,22 +877,22 @@ ShareGroup getOrMaybeCreatePersistedShareGroup(
878877
Group group = groups.get(groupId);
879878

880879
if (group == null && !createIfNotExists) {
881-
throw new IllegalStateException(String.format("Share group %s not found.", groupId));
880+
throw new GroupIdNotFoundException(String.format("Share group %s not found.", groupId));
882881
}
883882

884883
if (group == null) {
885884
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, groupId);
886885
groups.put(groupId, shareGroup);
887886
return shareGroup;
887+
} else {
888+
if (group.type() == SHARE) {
889+
return (ShareGroup) group;
890+
} else {
891+
// We don't support upgrading/downgrading between protocols at the moment so
892+
// we throw an exception if a group exists with the wrong type.
893+
throw new IllegalStateException(String.format("Group %s is not a share group.", groupId));
894+
}
888895
}
889-
890-
if (group.type() != SHARE) {
891-
// We don't support upgrading/downgrading between protocols at the moment so
892-
// we throw an exception if a group exists with the wrong type.
893-
throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", groupId));
894-
}
895-
896-
return (ShareGroup) group;
897896
}
898897

899898
/**
@@ -2032,11 +2031,10 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
20322031

20332032
// Get or create the share group.
20342033
boolean createIfNotExists = memberEpoch == 0;
2035-
final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, createIfNotExists);
2034+
final ShareGroup group = getOrMaybeCreateShareGroup(groupId, createIfNotExists);
20362035
throwIfShareGroupIsFull(group, memberId);
20372036

20382037
// Get or create the member.
2039-
if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
20402038
ShareGroupMember member = getOrMaybeSubscribeShareGroupMember(
20412039
group,
20422040
memberId,
@@ -2143,9 +2141,12 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
21432141
.setHeartbeatIntervalMs(shareGroupHeartbeatIntervalMs(groupId));
21442142

21452143
// The assignment is only provided in the following cases:
2146-
// 1. The member just joined or rejoined to group (epoch equals to zero);
2144+
// 1. The member sent a full request. It does so when joining or rejoining the group with zero
2145+
// as the member epoch; or on any errors (e.g. timeout). We use all the non-optional fields
2146+
// (subscribedTopicNames) to detect a full request as those must be set in a full request.
21472147
// 2. The member's assignment has been updated.
2148-
if (memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) {
2148+
boolean isFullRequest = subscribedTopicNames != null;
2149+
if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
21492150
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
21502151
}
21512152

Diff for: group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14537,7 +14537,7 @@ public void testShareGroupUnknownGroupId() {
1453714537
.withShareGroupAssignor(assignor)
1453814538
.build();
1453914539

14540-
assertThrows(IllegalStateException.class, () ->
14540+
assertThrows(GroupIdNotFoundException.class, () ->
1454114541
context.shareGroupHeartbeat(
1454214542
new ShareGroupHeartbeatRequestData()
1454314543
.setGroupId(groupId)

0 commit comments

Comments
 (0)