Skip to content

Commit 97fb8be

Browse files
committed
KAFKA-18464: Empty Abort Transaction can fence producer incorrectly with Transactions V2 (#18467)
To avoid self-fencing in the commit/abort + empty abort scenario, return the concurrent transactions error when we have pending state and do the epoch check second. In this scenario, we will complete the previous transaction before proceeding to the empty abort. Added a test that failed before the change. Note -- only the pending state is checked earlier. This is because we don’t return from EndTxn (the first commit) until we already written to the log, transitioned to PrepareX, and have the pending CompleteX state. We don't need to worry about the cases of an EndTxn request coming in with PrepareX without the pending state because that would be an older request and/or retry which are already covered. Reviewers: Artem Livshits <[email protected]>, Jeff Kim <[email protected]>
1 parent 6974007 commit 97fb8be

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -814,10 +814,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
814814

815815
if (txnMetadata.producerId != producerId && !retryOnOverflow)
816816
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
817-
else if (!isValidEpoch)
818-
Left(Errors.PRODUCER_FENCED)
819817
else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
820818
Left(Errors.CONCURRENT_TRANSACTIONS)
819+
else if (!isValidEpoch)
820+
Left(Errors.PRODUCER_FENCED)
821821
else txnMetadata.state match {
822822
case Ongoing =>
823823
val nextState = if (txnMarkerResult == TransactionResult.COMMIT)

core/src/test/scala/integration/kafka/api/TransactionsTest.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,22 @@ class TransactionsTest extends IntegrationTestHarness {
924924
}
925925
}
926926

927+
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.isTV2Enabled={2}")
928+
@CsvSource(Array(
929+
"kraft, consumer, true",
930+
))
931+
def testEmptyAbortAfterCommit(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = {
932+
val producer = transactionalProducers.head
933+
934+
producer.initTransactions()
935+
producer.beginTransaction()
936+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "4", "4", willBeCommitted = false))
937+
producer.commitTransaction()
938+
939+
producer.beginTransaction()
940+
producer.abortTransaction()
941+
}
942+
927943
private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
928944
start: Int, end: Int, willBeCommitted: Boolean): Unit = {
929945
for (i <- start until end) {

0 commit comments

Comments
 (0)