Skip to content

Commit a77073f

Browse files
Addressing PR review.
1 parent 0667079 commit a77073f

File tree

2 files changed

+8
-7
lines changed

2 files changed

+8
-7
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ public void initTransactions(boolean keepPreparedTxn) {
673673
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
674674
sender.wakeup();
675675
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
676-
new PotentialCauseException("InitTransactions timed out – could not discover the transaction coordinator or receive the InitProducerId response within max.block.ms (broker unavailable, network lag, or ACL denial)."));
676+
() -> new PotentialCauseException("InitTransactions timed out – could not discover the transaction coordinator or receive the InitProducerId response within max.block.ms (broker unavailable, network lag, or ACL denial)."));
677677
producerMetrics.recordInit(time.nanoseconds() - now);
678678
transactionManager.maybeUpdateTransactionV2Enabled(true);
679679
}
@@ -763,7 +763,7 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
763763
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
764764
sender.wakeup();
765765
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
766-
new PotentialCauseException("SendOffsetsToTransaction timed out – unable to reach the consumer-group or transaction coordinator or to receive the TxnOffsetCommit/AddOffsetsToTxn response within max.block.ms (coordinator unavailable, rebalance in progress, network/ACL issue)."));
766+
() -> new PotentialCauseException("SendOffsetsToTransaction timed out – unable to reach the consumer-group or transaction coordinator or to receive the TxnOffsetCommit/AddOffsetsToTxn response within max.block.ms (coordinator unavailable, rebalance in progress, network/ACL issue)."));
767767
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
768768
}
769769
}
@@ -849,7 +849,7 @@ public void commitTransaction() throws ProducerFencedException {
849849
TransactionalRequestResult result = transactionManager.beginCommit();
850850
sender.wakeup();
851851
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
852-
new PotentialCauseException("CommitTransaction timed out – failed to complete EndTxn with the transaction coordinator within max.block.ms (coordinator unavailable, network lag, ACL/rebalance)."));
852+
() -> new PotentialCauseException("CommitTransaction timed out – failed to complete EndTxn with the transaction coordinator within max.block.ms (coordinator unavailable, network lag, ACL/rebalance)."));
853853
producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
854854
}
855855

@@ -885,7 +885,7 @@ public void abortTransaction() throws ProducerFencedException {
885885
TransactionalRequestResult result = transactionManager.beginAbort();
886886
sender.wakeup();
887887
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
888-
new PotentialCauseException("AbortTransaction timed out – could not complete EndTxn(abort) with the transaction coordinator within max.block.ms (coordinator unavailable/rebalancing, network lag, or ACL denial)."));
888+
() -> new PotentialCauseException("AbortTransaction timed out – could not complete EndTxn(abort) with the transaction coordinator within max.block.ms (coordinator unavailable/rebalancing, network lag, or ACL denial)."));
889889
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
890890
}
891891

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.function.Supplier;
2627

2728
public final class TransactionalRequestResult {
2829
private final CountDownLatch latch;
@@ -56,16 +57,16 @@ public void await(long timeout, TimeUnit unit) {
5657
this.await(timeout, unit, null);
5758
}
5859

59-
public void await(long timeout, TimeUnit unit, PotentialCauseException potentialCauseException) {
60+
public void await(long timeout, TimeUnit unit, Supplier<PotentialCauseException> potentialCauseException) {
6061
try {
6162
boolean success = latch.await(timeout, unit);
6263
if (!success) {
6364
if (potentialCauseException == null) {
6465
throw new TimeoutException("Timeout expired after " + unit.toMillis(timeout) +
65-
"ms while awaiting " + operation);
66+
"ms while awaiting " + operation, new PotentialCauseException("Unknown reason."));
6667
} else {
6768
throw new TimeoutException("Timeout expired after " + unit.toMillis(timeout) +
68-
"ms while awaiting " + operation, potentialCauseException);
69+
"ms while awaiting " + operation, potentialCauseException.get());
6970
}
7071
}
7172

0 commit comments

Comments
 (0)