Skip to content

Commit b58b772

Browse files
committed
Fix Reactive Producer Tests (Fencing)
Use a different `transactional.id` for each test.
1 parent bd57099 commit b58b772

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.junit.jupiter.api.BeforeAll;
3838
import org.junit.jupiter.api.BeforeEach;
3939
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.TestInfo;
4041
import org.reactivestreams.Publisher;
4142
import org.reactivestreams.Subscription;
4243

@@ -102,17 +103,18 @@ public static void setUpBeforeClass() {
102103
}
103104

104105
@BeforeEach
105-
public void setUp() {
106-
reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(),
106+
public void setUp(TestInfo info) {
107+
reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(info),
107108
new MessagingMessageConverter());
108109
}
109110

110-
private SenderOptions<Integer, String> setupSenderOptionsWithDefaultTopic() {
111+
private SenderOptions<Integer, String> setupSenderOptionsWithDefaultTopic(TestInfo info) {
111112
Map<String, Object> senderProps =
112113
KafkaTestUtils.producerProps(EmbeddedKafkaCondition.getBroker().getBrokersAsString());
113114
SenderOptions<Integer, String> senderOptions = SenderOptions.create(senderProps);
114115
senderOptions = senderOptions
115-
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "reactive.transaction")
116+
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
117+
"reactive.transaction." + info.getDisplayName().replaceAll("\\(\\)", ""))
116118
.producerProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
117119
return senderOptions;
118120
}
@@ -270,7 +272,9 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
270272
.abort()
271273
.then(Mono.error(error))))
272274
.expectErrorMatches(throwable -> throwable instanceof IllegalStateException &&
273-
throwable.getMessage().equals("TransactionalId reactive.transaction: Invalid transition " +
275+
throwable.getMessage().equals("TransactionalId reactive.transaction."
276+
+ "shouldSendOneRecordTransactionallyViaTemplateAsSenderRecord"
277+
+ "AndReceiveItExactlyOnceWithException: Invalid transition " +
274278
"attempted from state READY to state ABORTING_TRANSACTION"))
275279
.verify(DEFAULT_VERIFY_TIMEOUT);
276280

0 commit comments

Comments
 (0)