Skip to content

[WIP] KAFKA-19012: Messages ending up on the wrong topic #20146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: 3.7
Choose a base branch
from

Conversation

kirktrue
Copy link
Contributor

Adding instrumentation for KafkaProducer to detect when the topic a message was written to changes after it's enqueued.

kirktrue and others added 5 commits July 10, 2025 12:43
Adding instrumentation for KafkaProducer to detect when the topic a message was written to changes after it's enqueued.
Updated updateInconsistentTopics to accept the batch's TopicPartition and Records instead of passing in the ProducerBatch directly.
Introduces a dedicated class to perform instrumentation at points before decompression is required.
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks for the PR. LGTM. Just a minor comment. Also, could you resolve the conflicts?

@@ -127,6 +128,7 @@ public RecordAccumulator(LogContext logContext,
Time time,
ApiVersions apiVersions,
TransactionManager transactionManager,
Kafka19012Instrumentation kafka19012Instrumentation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the javadoc for the new param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks for the updated PR. Just a minor comment.

@@ -127,6 +128,7 @@ public RecordAccumulator(LogContext logContext,
Time time,
ApiVersions apiVersions,
TransactionManager transactionManager,
Kafka19012Instrumentation kafka19012Instrumentation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the javadoc for the new param?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants