Skip to content

Conversation

@chirag-wadhwa5
Copy link
Collaborator

@chirag-wadhwa5 chirag-wadhwa5 commented Nov 18, 2025

This PR is part of
KIP-1226.

This PR adds integration tests to ShareConsumerTest.java file to verify
the share partition lag is reported successfully in various scenarios.

Reviewers: Andrew Schofield [email protected]

@github-actions github-actions bot added triage PRs from the community tests Test fixes (including flaky tests) clients labels Nov 18, 2025
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Nov 18, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just a few comments.

.toList();
}

private SharePartitionOffsetInfo sharePartitionDescription(Admin adminClient, String groupId, TopicPartition tp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Method name seems wrong. It's returning the share-partition offset info, not the description.

}

private List<Integer> topicPartitionLeader(Admin adminClient, String topicName, int partition)
throws InterruptedException, ExecutionException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The indentation on this method and the next is a bit odd because the throws and the start of the method body are equally indented. I'd just put the throws on the same line as the argument list for these methods.

newGroupCoordNodeId.size() == 1 && !Objects.equals(newGroupCoordNodeId.get(0), curShareCoordNodeId.get(0)) &&
newTopicPartitionLeader.size() == 1 && !Objects.equals(newTopicPartitionLeader.get(0), curShareCoordNodeId.get(0));
}, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to elect new leaders after broker shutdown");
// After share coordinator shutdown and new leaderS election, check that lag is still 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new "leader's" not "leaderS"

serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do need replication factor 3, but you only need one partition for each of these topics. Then you'd not have to fiddle around trying to calculate the partitions used for the internal topics.

@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the code from the bottom so my comment about not needing 3 partitions for these internal topics stands here too. You do need replication factor 3.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. We need a green build to merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved clients KIP-932 Queues for Kafka tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants