Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17894: Implemented broker topic metrics for Share Group 1/N (KIP-1103) #18444

Merged
merged 11 commits into from
Jan 24, 2025

Conversation

apoorvmittal10
Copy link
Collaborator

The PR implements the BrokerTopicMetrics defined in KIP-1103.

The PR also corrected the share-acknowledgement-rate and share-acknowledgement-count metrics defined in KIP-932 as they are moved to BrokerTopicMetrics, neccessary changes to KIP-932 broker metrics will be done once we complete KIP-1103.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added the triage PRs from the community label Jan 8, 2025
@github-actions github-actions bot added core Kafka Broker storage Pull requests that target the storage module KIP-932 Queues for Kafka labels Jan 8, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Some comments added from my initial review.

metricTypeMap.put(TOTAL_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_PRODUCE_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_FETCH_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_FETCH_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_SHARE_FETCH_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_SHARE_FETCH_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC, "requests"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this metric is missing from KIP-1103.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I remember you pointed out this on KIP-1103 discussion as well.

AS4,AS6: We already have a share-acknowledgement Meter metric defined in KIP-932.
I am of the opinion to move the KIP-932 share-acknowledgement metric to TotalShareAcknowledgementRequestsPerSec.
Hence I have added corresponding FailedShareAcknowledgementRequestsPerSec in this KIP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the KIP-1103 will get updated after merging this PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will update KIP-932 or KIP-1103 with this change. Once all metrics are merged for KIP-1103 then I ll summarize these minor changes and will update both the KIPs accordingly so we have parity.

@@ -87,4 +95,104 @@ public void testFilterErroneousTopicPartitions() {
assertTrue(result.isEmpty());
}

@Test
@SuppressWarnings("unchecked")
public void testMayBeCompleteWithErroneousTopicPartitions() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This should be "Maybe".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Map<TopicIdPartition, CompletableFuture<Throwable>> futures = new HashMap<>();
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
// Update share acknowledgement metrics.
brokerTopicStats.topicStats(topicIdPartition.topicPartition().topic()).totalShareAcknowledgementRequestRate().mark();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just use topicIdPartition.topic()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap) {
private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(
Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap,
Optional<BiConsumer<Collection<String>, Boolean>> metricsHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think it would be neater to have an interface for the metrics handler as opposed to using BiConsumer<Collection<String>, Boolean> everywhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of 1 BiConsumer by passing BrokerTopicStats to ShareFetch as with recent refactoring we can pass BrokerTopicStats to ShareFetch as the class is moved to server module. The othe BiConsumer, I simplified to use Consumer. As there exists fust 1 such functional call now hence skipped creating an interface. Please let me know if you disagree.

// Update failed share fetch request metric.
topicIdPartitions.forEach(topicIdPartition ->
brokerTopicStats.topicStats(topicIdPartition.topicPartition().topic()).failedShareFetchRequestRate().mark());
if (allTopicPartitionsFailed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. What you have done here is marked the all-topics metric if all topic partitions failed, when I was really expecting you to mark it for every topic partition which failed, rather than only when they all did.

If you look at ReplicaManager.appendToLocalLog, it marks once per failed topic partition, not once when all topic partitions failed. I suspect this is wrong and the boolean parameter is not necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought about it while implementing and also saw failed fetch is populated as well when any fetch for topic fails. I was thinking in terms of metrics usage. Say for failedShareFetchRequestRate, If we always mark the all topic metric as failed when any one of the topic fetch failed then 2 metrics might not yeild major value. Then topic metric is more like a log which can help debug that which topic fetch has failed.

Marking the all topic metric as failed when any topic fetch fails shall be desirable when complete request is failed on any topic fetch failure, which seems to be the case for regular fetch. But which is not true for share-fetch. I was thinking that for share-fetch if alltopic metric fails then it's critical (as complete fetch/acknowledge request failed), if topic level metric fails then operator should debug regarding what makes one of the topic failure (for single topic the metrics will yeild same result).

Also I find bumping up all topic stats for each topic-partition in a request also not right -

brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
. As it will give incorrect request rate for overall metric. So avoided that as well in implementation.

I know it's different than what we currently have but I am struggling to find value in existing implementation. I might be missing something hence open for suggestions so I can correct things.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was just highlighting a difference in approach here. I'll have a think about which is better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I ll address other comments till then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Can you please also review this behaviour as it's a different way than existing fetch BrokerTopicMetrics. Just want to be sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The intention of brokerTopicStats.topicStats and brokerTopicStats.allTopicsStats is that they are recording the same value, just at different level. The idea is that if it's too expensive to track at the topic level, one can at least get the same signal at the aggregate level.
  2. I agree that marking the request rate metric for each partition seems incorrect. We need to mark the metric at most once per request and we want to do that consistently for total and failed requests.
  3. If it's important to track shareFetchRequests with all topic-partition failed, we can introduce a separate metric for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Thanks for the context. We should be consistent here too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @junrao and @AndrewJSchofield. I have updated the behaviour as discussed in the thread.

So now only 1 thing differ in ShareFetch vs existing Fetch as mentioned in point 2 above by @junrao that for a share fetch if 1 topic with 2 partitions arrive and both partitions fail then only 1 fail and share fetch against topic will be registered, unlike in existing fetch metrics where 2 will be recorded (per partition).

@github-actions github-actions bot removed the triage PRs from the community label Jan 10, 2025
return (topicIdPartitions, allTopicPartitionsFailed) -> {
// Update failed share fetch request metric.
topicIdPartitions.forEach(topicIdPartition ->
brokerTopicStats.topicStats(topicIdPartition.topicPartition().topic()).failedShareFetchRequestRate().mark());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like another instance where topicIdPartition.topicPartition().topic() can just be topicIdPartition.topic().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@apoorvmittal10
Copy link
Collaborator Author

I have addressed the comments, the PR is ready for review.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. Made a pass of non-testing files. Left a couple of comments.

return mapAcknowledgementFutures(futures);
// Update the metrics for the topics for which we have received an acknowledgement.
topics.forEach(topic -> {
brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().mark();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this track just the ShareAcknowledgementRequest or the acks in ShareFetchRequest too? If yes, we probably want to name the metric more accurately. If not, this method is called by both ShareFetchRequest and ShareAcknowledgeRequest and we need to separate them out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's called for both. I don't agree that they need to be separated out. There are two equivalent ways of acknowledging delivery and I would include both in the same metrics.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with Andrew. That's the intent to track the ShareAcknowlwegment request for topics. Explicit ack and share fetch request we ll track by kafka.network request metrics, as you suggested in other comment.

@@ -598,6 +632,12 @@ void processShareFetch(ShareFetch shareFetch) {
sharePartitions.put(topicIdPartition, sharePartition);
}

// Update the metrics for the topics for which we have received a share fetch request.
topics.forEach(topic -> {
brokerTopicStats.allTopicsStats().totalShareFetchRequestRate().mark();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a generic metric that tracks the rate for each type of request. So, we probably don't need this one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But shouldn't we have something similar like totalShareFetchRate for share fetch as it's tracking per topic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep the per topic one, but we don't need the one that aggregates across all topics.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something here but this metrics is similar to TotalFetchRequestsPerSec as it gives the aggregated view of the all topic metrics. Are you suggesting that there should be a cleanup for existing metrics as well to not emit the aggregated ones in future? Sorry I think I am missing something basic here so thought to clarify prior making further change.

public static final String TOTAL_FETCH_REQUESTS_PER_SEC = "TotalFetchRequestsPerSec";

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TotalFetchRequestsPerSec is also duplicating the generic request rate metric. Since it's already there, we can clean it up later. However, it would be useful to avoid introducing duplicated metrics for the new code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed with @junrao, existing TotalFetchRequestsPerSec is not an exact duplicate for kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Fetch as the former tracks an aggregated per topic metric but latter tracks the Fetch RPC. So I think we are good here.
Screenshot 2025-01-22 at 22 02 45
cc: @AndrewJSchofield

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the former tracks an aggregated per topic

It seems we don't address that. It is based on topic-partition.

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1547

Producer request has similar behavior.

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1298

Maybe we should fix both fetch/produce metrics

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 yes you are right, as discussed in the thread as well here, I have created the task to fix that: https://issues.apache.org/jira/browse/KAFKA-18640

return mapAcknowledgementFutures(futures);
// Update the metrics for the topics for which we have received an acknowledgement.
topics.forEach(topic -> {
brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().mark();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's called for both. I don't agree that they need to be separated out. There are two equivalent ways of acknowledging delivery and I would include both in the same metrics.

private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap) {
private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(
Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap,
Optional<Consumer<Collection<String>>> metricsHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really is the failedMetricsHandler

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*
* @return A Consumer that updates the failed share acknowledge request metrics.
*/
private Consumer<Collection<String>> failedShareAcknowledgeMetricsHandler() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this Collection is actually a Set in practice. Given that you're taking steps to ensure that there are no duplicates in the collection, I personally would make the signature use a Set also.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

erroneous.forEach((topicIdPartition, throwable) -> {
erroneousTopics.add(topicIdPartition.topic());
response.put(topicIdPartition, new PartitionData()
.setErrorCode(Errors.forException(throwable).code())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect the partition index to be initialised in the PartitionData here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's good to add. Though in KafkaApis while forming response the PartitionIndex is added as well because the map contains TopicIdPartition as the key.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. Thanks for the PR.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. LGTM.

Could we file a followup jira to change the existing totalFetch/totalProduce metrics to be consistent with the totalShare metrics?

@apoorvmittal10
Copy link
Collaborator Author

@apoorvmittal10 : Thanks for the updated PR. LGTM.

Could we file a followup jira to change the existing totalFetch/totalProduce metrics to be consistent with the totalShare metrics?

@junrao thanks, I have created the task: https://issues.apache.org/jira/browse/KAFKA-18640

@apoorvmittal10 apoorvmittal10 requested a review from junrao January 24, 2025 14:36
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for filing the followup jira. LGTM

@junrao junrao merged commit 70eab77 into apache:trunk Jan 24, 2025
9 checks passed
pranavt84 pushed a commit to pranavt84/kafka that referenced this pull request Jan 27, 2025
…P-1103) (apache#18444)

The PR implements the BrokerTopicMetrics defined in KIP-1103.

The PR also corrected the share-acknowledgement-rate and share-acknowledgement-count metrics defined in KIP-932 as they are moved to BrokerTopicMetrics, necessary changes to KIP-932 broker metrics will be done once we complete KIP-1103.

Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
airlock-confluentinc bot pushed a commit to confluentinc/kafka that referenced this pull request Jan 27, 2025
…P-1103) (apache#18444)

The PR implements the BrokerTopicMetrics defined in KIP-1103.

The PR also corrected the share-acknowledgement-rate and share-acknowledgement-count metrics defined in KIP-932 as they are moved to BrokerTopicMetrics, necessary changes to KIP-932 broker metrics will be done once we complete KIP-1103.

Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
pdruley pushed a commit to pdruley/kafka that referenced this pull request Feb 12, 2025
…P-1103) (apache#18444)

The PR implements the BrokerTopicMetrics defined in KIP-1103.

The PR also corrected the share-acknowledgement-rate and share-acknowledgement-count metrics defined in KIP-932 as they are moved to BrokerTopicMetrics, necessary changes to KIP-932 broker metrics will be done once we complete KIP-1103.

Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
…P-1103) (apache#18444)

The PR implements the BrokerTopicMetrics defined in KIP-1103.

The PR also corrected the share-acknowledgement-rate and share-acknowledgement-count metrics defined in KIP-932 as they are moved to BrokerTopicMetrics, necessary changes to KIP-932 broker metrics will be done once we complete KIP-1103.

Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-932 Queues for Kafka storage Pull requests that target the storage module
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants