-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19500: kafka-consumer-groups.sh
should fail quickly if the partition leader is unavailable
#20168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The all methods which depend on
And they are all called in method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Just one minor comment.
private void checkAllTopicPartitionsHaveLeader(Collection<TopicPartition> partitionsToReset) { | ||
List<TopicPartition> partitionsWithoutLeader = filterNoneLeaderPartitions(partitionsToReset); | ||
if (!partitionsWithoutLeader.isEmpty()) { | ||
// append the TopicPartition list string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You could do this string concatenation much more neatly with something like
String partitionStr = partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndrewJSchofield Thanks very much for CR, it's indeed much neater. I have fixed it, PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @xijiu for this patch, a little comment
@@ -659,6 +663,26 @@ public void testResetWithUnrecognizedNewConsumerOption(ClusterInstance cluster) | |||
assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); | |||
} | |||
|
|||
@Timeout(60) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: No need to add @Timeout
here, @ClusterTest
already includes a timeout setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st Thanks very much, the @Timeout
is redundant, and I have remove it, PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could do with a little more work. If I create a topic T1
with 1 partition, the following command bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group CG1 --to-earliest --topic T1:1 --execute
still hangs for a long time, even though it has called Admin.describeTopics
.
I think it can be improved for the case of a non-existent topic of an existing topic. Seems like it's worth fixing this at the same time.
Good find! It's indeed hangs when a non-existent TopicPartition of an existing topic. I have added some logic to determine whether the target TopicPartitions actually exists, PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Just a few minor comments now.
List<TopicPartition> partitionsNotExistList = filterNotExistPartitions(partitionsToReset); | ||
if (!partitionsNotExistList.isEmpty()) { | ||
String partitionStr = partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(",")); | ||
throw new UnknownTopicOrPartitionException("The partitions \"" + partitionStr + "\" does not exist"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Grammar "do not exist" I think.
throw new LeaderNotAvailableException("The partitions \"" + partitionStr + "\" have no leader"); | ||
} | ||
} | ||
|
||
private List<TopicPartition> filterNotExistPartitions(Collection<TopicPartition> topicPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: filterNonExistentPartitions
is probably a bit better.
.toList(); | ||
|
||
return topicPartitions.stream().filter(element -> !existPartitions.contains(element)).toList(); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency, we tend to catch (InterruptedException | ExecutionException e)
in this source file.
Also please merge latest changes from trunk. Hopefully you'll get a clean test run then. |
@AndrewJSchofield Thanks for the code review, and I have fixed them and merged the latest changes from trunk, PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Looks good to me.
} | ||
|
||
// check the partitions have leader | ||
List<TopicPartition> partitionsWithoutLeader = filterNoneLeaderPartitions(partitionsToReset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is a topic having three partitions, and the t-2
partition is offline, then if partitionsToReset
is t-0,t-1
, filterNoneLeaderPartitions
will return t-2
, causing the tool to fail. Is it expected?
ConsumerGroupCommand.ConsumerGroupService#prepareOffsetsToReset
inorder to fail quickly
Reviewers: TaiJuWu [email protected], Lan Ding [email protected],
Ken Huang [email protected], Andrew Schofield
[email protected]