Skip to content

Commit 66215de

Browse files
Addressing PR review.
1 parent abc86a6 commit 66215de

File tree

5 files changed

+20
-46
lines changed

5 files changed

+20
-46
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.kafka.common.errors.InterruptException;
5252
import org.apache.kafka.common.errors.InvalidTopicException;
5353
import org.apache.kafka.common.errors.InvalidTxnStateException;
54-
import org.apache.kafka.common.errors.PotentialCauseException;
5554
import org.apache.kafka.common.errors.ProducerFencedException;
5655
import org.apache.kafka.common.errors.RecordTooLargeException;
5756
import org.apache.kafka.common.errors.RetriableException;
@@ -673,7 +672,7 @@ public void initTransactions(boolean keepPreparedTxn) {
673672
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
674673
sender.wakeup();
675674
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
676-
() -> new PotentialCauseException("InitTransactions timed out – could not discover the transaction coordinator or receive the InitProducerId response within max.block.ms (broker unavailable, network lag, or ACL denial)."));
675+
() -> new KafkaException("InitTransactions timed out – could not discover the transaction coordinator or receive the InitProducerId response within max.block.ms (broker unavailable, network lag, or ACL denial)."));
677676
producerMetrics.recordInit(time.nanoseconds() - now);
678677
transactionManager.maybeUpdateTransactionV2Enabled(true);
679678
}
@@ -763,7 +762,9 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
763762
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
764763
sender.wakeup();
765764
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
766-
() -> new PotentialCauseException("SendOffsetsToTransaction timed out – unable to reach the consumer-group or transaction coordinator or to receive the TxnOffsetCommit/AddOffsetsToTxn response within max.block.ms (coordinator unavailable, rebalance in progress, network/ACL issue)."));
765+
() -> new KafkaException("SendOffsetsToTransaction timed out – unable to reach the consumer-group or "
766+
+ "transaction coordinator or to receive the TxnOffsetCommit/AddOffsetsToTxn "
767+
+ "response within max.block.ms (coordinator unavailable, rebalance in progress, network/ACL issue)."));
767768
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
768769
}
769770
}
@@ -849,7 +850,8 @@ public void commitTransaction() throws ProducerFencedException {
849850
TransactionalRequestResult result = transactionManager.beginCommit();
850851
sender.wakeup();
851852
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
852-
() -> new PotentialCauseException("CommitTransaction timed out – failed to complete EndTxn with the transaction coordinator within max.block.ms (coordinator unavailable, network lag, ACL/rebalance)."));
853+
() -> new KafkaException("CommitTransaction timed out – failed to complete EndTxn with the transaction coordinator "
854+
+ "within max.block.ms (coordinator unavailable, network lag, ACL/rebalance)."));
853855
producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
854856
}
855857

@@ -885,7 +887,9 @@ public void abortTransaction() throws ProducerFencedException {
885887
TransactionalRequestResult result = transactionManager.beginAbort();
886888
sender.wakeup();
887889
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
888-
() -> new PotentialCauseException("AbortTransaction timed out – could not complete EndTxn(abort) with the transaction coordinator within max.block.ms (coordinator unavailable/rebalancing, network lag, or ACL denial)."));
890+
() -> new KafkaException("AbortTransaction timed out – could not complete EndTxn(abort) "
891+
+ "with the transaction coordinator within max.block.ms "
892+
+ "(coordinator unavailable/rebalancing, network lag, or ACL denial)."));
889893
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
890894
}
891895

@@ -1232,7 +1236,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
12321236
}
12331237
if (ex.getCause() != null)
12341238
throw new TimeoutException(errorMessage, ex.getCause());
1235-
throw new TimeoutException(errorMessage, new PotentialCauseException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled."));
1239+
throw new TimeoutException(errorMessage, new KafkaException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled."));
12361240
}
12371241
cluster = metadata.fetch();
12381242
elapsed = time.milliseconds() - nowMs;
@@ -1241,7 +1245,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
12411245
if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) {
12421246
throw new TimeoutException(errorMessage, metadata.getError(topic).exception());
12431247
}
1244-
throw new TimeoutException(errorMessage, new PotentialCauseException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled."));
1248+
throw new TimeoutException(errorMessage, new KafkaException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled."));
12451249
}
12461250
metadata.maybeThrowExceptionForTopic(topic);
12471251
remainingWaitMs = maxWaitMs - elapsed;

clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.common.PartitionInfo;
2828
import org.apache.kafka.common.TopicPartition;
2929
import org.apache.kafka.common.Uuid;
30-
import org.apache.kafka.common.errors.PotentialCauseException;
3130
import org.apache.kafka.common.errors.ProducerFencedException;
3231
import org.apache.kafka.common.errors.TimeoutException;
3332
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -425,7 +424,7 @@ public Uuid clientInstanceId(Duration timeout) {
425424
if (injectTimeoutExceptionCounter > 0) {
426425
--injectTimeoutExceptionCounter;
427426
}
428-
throw new TimeoutException(new PotentialCauseException("TimeoutExceptions are successfully injected for test."));
427+
throw new TimeoutException(new KafkaException("TimeoutExceptions are successfully injected for test."));
429428
}
430429

431430
return clientInstanceId;

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.kafka.common.errors.FencedLeaderEpochException;
3535
import org.apache.kafka.common.errors.InvalidMetadataException;
3636
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
37-
import org.apache.kafka.common.errors.PotentialCauseException;
3837
import org.apache.kafka.common.errors.RetriableException;
3938
import org.apache.kafka.common.errors.TimeoutException;
4039
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -416,7 +415,9 @@ private long sendProducerData(long now) {
416415
for (ProducerBatch expiredBatch : expiredBatches) {
417416
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
418417
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
419-
failBatch(expiredBatch, new TimeoutException(new PotentialCauseException(errorMessage)), false);
418+
KafkaException potentialCause = new KafkaException(
419+
"The broker might be unavailable or responding slowly, or the CPU might be busy.");
420+
failBatch(expiredBatch, new TimeoutException(errorMessage, potentialCause), false);
420421
if (transactionManager != null && expiredBatch.inRetry()) {
421422
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
422423
transactionManager.markSequenceUnresolved(expiredBatch);

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package org.apache.kafka.clients.producer.internals;
1818

1919

20+
import org.apache.kafka.common.KafkaException;
2021
import org.apache.kafka.common.errors.InterruptException;
21-
import org.apache.kafka.common.errors.PotentialCauseException;
2222
import org.apache.kafka.common.errors.TimeoutException;
2323

2424
import java.util.concurrent.CountDownLatch;
@@ -50,19 +50,19 @@ public void done() {
5050
}
5151

5252
public void await() {
53-
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, () -> new PotentialCauseException("Unknown reason."));
53+
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, () -> new KafkaException("Unknown reason."));
5454
}
5555

5656
public void await(long timeout, TimeUnit unit) {
57-
this.await(timeout, unit, () -> new PotentialCauseException("Unknown reason."));
57+
this.await(timeout, unit, () -> new KafkaException("Unknown reason."));
5858
}
5959

60-
public void await(long timeout, TimeUnit unit, Supplier<PotentialCauseException> potentialCauseException) {
60+
public void await(long timeout, TimeUnit unit, Supplier<KafkaException> potentialCauseException) {
6161
try {
6262
boolean success = latch.await(timeout, unit);
6363
if (!success) {
6464
throw new TimeoutException("Timeout expired after " + unit.toMillis(timeout) +
65-
"ms while awaiting " + operation, potentialCauseException.get());
65+
"ms while awaiting " + operation, potentialCauseException.get());
6666
}
6767

6868
isAcked = true;

clients/src/main/java/org/apache/kafka/common/errors/PotentialCauseException.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)