@@ -21,9 +21,8 @@ import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
21
21
import kafka .utils .{TestInfoUtils , TestUtils }
22
22
import org .apache .kafka .clients .consumer ._
23
23
import org .apache .kafka .clients .producer .{KafkaProducer , ProducerRecord }
24
- import org .apache .kafka .common .TopicPartition
24
+ import org .apache .kafka .common .{ KafkaException , TopicPartition }
25
25
import org .apache .kafka .common .errors .{ConcurrentTransactionsException , InvalidProducerEpochException , ProducerFencedException , TimeoutException }
26
- import org .apache .kafka .common .test .api .Flaky
27
26
import org .apache .kafka .coordinator .group .GroupCoordinatorConfig
28
27
import org .apache .kafka .coordinator .transaction .{TransactionLogConfig , TransactionStateManagerConfig }
29
28
import org .apache .kafka .server .config .{ReplicationConfigs , ServerConfigs , ServerLogConfigs }
@@ -693,7 +692,6 @@ class TransactionsTest extends IntegrationTestHarness {
693
692
assertThrows(classOf [IllegalStateException ], () => producer.initTransactions())
694
693
}
695
694
696
- @ Flaky (" KAFKA-18035" )
697
695
@ ParameterizedTest
698
696
@ CsvSource (Array (
699
697
" kraft,classic,false" ,
@@ -733,6 +731,19 @@ class TransactionsTest extends IntegrationTestHarness {
733
731
restartDeadBrokers()
734
732
735
733
org.apache.kafka.test.TestUtils .assertFutureThrows(failedFuture, classOf [TimeoutException ])
734
+ // Ensure the producer transitions to abortable_error state.
735
+ TestUtils .waitUntilTrue(() => {
736
+ var failed = false
737
+ try {
738
+ producer.send(TestUtils .producerRecordWithExpectedTransactionStatus(testTopic, 0 , " 3" , " 3" , willBeCommitted = false ))
739
+ } catch {
740
+ case e : Exception =>
741
+ if (e.isInstanceOf [KafkaException ])
742
+ failed = true
743
+ }
744
+ failed
745
+ }, " The send request never failed as expected." )
746
+ assertThrows(classOf [KafkaException ], () => producer.send(TestUtils .producerRecordWithExpectedTransactionStatus(testTopic, 0 , " 3" , " 3" , willBeCommitted = false )))
736
747
producer.abortTransaction()
737
748
738
749
producer.beginTransaction()
@@ -755,7 +766,7 @@ class TransactionsTest extends IntegrationTestHarness {
755
766
producerStateEntry =
756
767
brokers(partitionLeader).logManager.getLog(new TopicPartition (testTopic, 0 )).get.producerStateManager.activeProducers.get(producerId)
757
768
assertNotNull(producerStateEntry)
758
- assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch)
769
+ assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch, " InitialProduceEpoch: " + initialProducerEpoch + " ProducerStateEntry: " + producerStateEntry )
759
770
} finally {
760
771
producer.close(Duration .ZERO )
761
772
}
0 commit comments