-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-17019: Producer TimeoutException should include root cause #20159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove: Thanks for the patch.
this.await(timeout, unit, null); | ||
} | ||
|
||
public void await(long timeout, TimeUnit unit, PotentialCauseException potentialCauseException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change PotentialCauseException
to Supplier<PotentialCauseException>
?
Creating an exception instance is very expensive; lazy creation could help us avoid this situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comments and sounds great!
I have addressed PR review.
When you have time, please take another look. 🙇♂️
a77073f
to
abc86a6
Compare
Thanks for the PR. Because you've created a new public exception class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove thanks for this patch. one major comment is left. Please take a a look
throw new TimeoutException(errorMessage); | ||
if (ex.getCause() != null) | ||
throw new TimeoutException(errorMessage, ex.getCause()); | ||
throw new TimeoutException(errorMessage, new PotentialCauseException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems PotentialCauseException
does not offer more useful information than the error message, right? Perhaps we could enrich the error message instead of introducing a new nonspecific exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 thanks for your comment. 🙇♂️
I agree your opinion especially that 'we could enrich the error message instead of introducing a new nonspecific exception'.
However, Introducing a new nonspecific exception to solve this issue has has pros and cons.
Let me explain more.
The KAFKA-17019
requires that all org.apache.kafka.common.errors.TimeoutException
should include root cause as a nested exception.
It’s hard to pinpoint the root cause in every situation where a TimeoutException
occurs
kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Lines 405 to 418 in c6cf517
accumulator.resetNextBatchExpiryTime(); | |
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now); | |
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now); | |
expiredBatches.addAll(expiredInflightBatches); | |
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics | |
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why | |
// we need to reset the producer id here. | |
if (!expiredBatches.isEmpty()) | |
log.trace("Expired {} batches in accumulator", expiredBatches.size()); | |
for (ProducerBatch expiredBatch : expiredBatches) { | |
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition | |
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; | |
failBatch(expiredBatch, new TimeoutException(errorMessage), false); |
In here, expiredInflightBatches
and expiredBatches
can cause TimeoutException
.
However, TimeoutException
is thrown as a result of calculating elapsed time.
Therefore, expiredInflightBatches
and expiredBatches
don't have information about root cause.
If expiredInflight
encounters some problem --
- Normal Case - No errors and succeed.
- Network Issue - Error might occurs after the expired period.
- a little bit slow network issue because of Bandwidth, and so on. - No errors and slow response.
- Busy CPU due to various reasons - No errors and called slowly.
There are many possible scenarios where a TimeoutException
can occur, even with a simple analysis.
However, the expiredInFlight
instance can actually throw an error in only 2 (Maybe connection closed, fail to establish connection, ...)
After spending a considerable amount of time reviewing the code and analyzing the TimeoutExceptions
thrown by the Producer, I concluded that it's difficult to extract the root cause at every point where a TimeoutException
is created.
Idea + Pros and Cons
Developers usually have a good understanding of what kind of error might occur in a given context.
Therefore, in cases where it's difficult to catch the actual root cause, it's possible to include an expected exception as the root cause instead.
Here’s a summary of the pros and cons (compared to simply enhancing the error message):
- pros : At call sites(For example,
kafka streams
,kafka connect
,kafka producer internal
and so on) where aTimeoutException
is expected, the root cause can be used to handle different cases conditionally.- For example, you could create
NetworkPotentialCauseException
andCPUBusyPotentialCauseException
as subclasses ofPotentialCauseException
, and handle different branches based on the root cause—branch A if the root cause is aNetworkPotentialCauseException
, and branch B if it’s aCPUBusyPotentialCauseException
.
- For example, you could create
- cons : Higher instance creation cost compared to
String
instance.
I spent quite a bit of time thinking through the direction of this PR.
What are your thoughts on it?
Please let me know. 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your explanation. I agree that specific exception types could help developer to catch the actual root cause. My question is, do we really need PotentialCauseException
to be the base exception for NetworkPotentialCauseException
and CPUBusyPotentialCauseException
? Perhaps KafkaException
is good enough.
Another exception design uses TimeoutException
as a parent class, similar to e032a36. The benefit of this is simplifying the code, since developers wouldn't need to check root cause of a TimeoutException
from root cause of an ExecutionException
😄
thanks for bringing up this great discussion. I'd like to see kafka exception hierarchy becomr more developer-friendly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712
Thank you for considering my proposal and for sharing your valuable thoughts.
As you mentioned, I also think it’s a good idea to reuse an existing Exception class.
Personally, I think using KafkaException
would be a better approach.
Since there can be multiple potential causes for a TimeoutException
in certain code paths, it might be difficult to pinpoint the exact cause. In such cases, it could be unclear which subclass of TimeoutException
should be used. (e032a36)
So, my suggestion is as follows:
- Include a
KafkaException
as the root cause of theTimeoutException
, and describe the possible scenario in the error message of theKafkaException
. - Let the detailed error information be available via the root cause of the
TimeoutException
. - Keep the current message format of the
TimeoutException
itself (which currently only includes the elapsed time before it expired).
I’m thinking of revising the PR in this direction.
This way, I believe we can preserve the current semantics of TimeoutException
while still conveying helpful contextual information (such as a potential cause) when necessary.
In the future, if there's a need to branch logic based on a more specific cause of the timeout—even if no actual exception was thrown—then the developer could define a concrete PotentialCauseException
class as needed.
Also, if this direction sounds reasonable, I think it wouldn’t require a KIP change.
What do you think?
Please share your opinion 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proposal of reusing KafkaException
sounds good. +1 to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me also. There's no need to create a KIP for this approach and it gives just as much information to the user.
@AndrewJSchofield Thanks for your comments 🙇♂️ https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Thanks in advance! |
Yes, that's right. You would follow the instructions in that wiki page and make a new KIP using the template. The discussion would proceed on the dev mailing list. For an interesting KIP, the community will generally not need any encouragement to get involved in the discussion. |
54df407
to
66215de
Compare
@chia7712 @AndrewJSchofield @frankvicky Hi! |
Changes
PotentialCauseException
.org.apache.kafka.common.errors.TimeoutException
inKafkaProducer
hasPotentialCauseException
as root cause if itcannot catch any exception.
Describe
TimeoutException
can be thrown for various reasons.However, it is often difficult to identify the root cause,
Because there are so many potential factors that can lead to a
TimeoutException
.For example:
ProducerClient
might be busy, so it may not be able to send therequest in time. As a result, some batches may expire, leading to a
TimeoutException
.broker
might be unavailable due to network issues or internalfailures.
handles and responds to it, the response might arrive slightly late.
As shown above, there are many possible causes. In some cases, no
exception
is caught in thecatch
block, and aTimeoutException
isthrown simply by comparing the
elapsed time
. However, the developerusing
TimeoutException
inKafkaProducer
likely already knows whichspecific reasons could cause it in that context. Therefore, I think it
would be helpful to include a
PotentialCauseException
that reflectsthe likely reason, based on the developer’s knowledge.