Skip to content

Commit 2d7eeab

Browse files
authored
Merge pull request #1470 from garyrussell/ARBPDecorate
Decorate exceptions for AfterRollbackProcessor
1 parent f9535fc commit 2d7eeab

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,12 +1376,14 @@ private void batchAfterRollback(final ConsumerRecords<K, V> records,
13761376
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
13771377
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
13781378

1379+
RuntimeException rollbackException = decorateException(e);
13791380
try {
13801381
if (recordList == null) {
1381-
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
1382+
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, rollbackException,
1383+
false);
13821384
}
13831385
else {
1384-
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
1386+
afterRollbackProcessorToUse.process(recordList, this.consumer, rollbackException, false);
13851387
}
13861388
}
13871389
catch (KafkaException ke) {
@@ -1607,7 +1609,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
16071609
}
16081610
catch (RuntimeException e) {
16091611
this.logger.error(e, "Transaction rolled back");
1610-
recordAfterRollback(iterator, record, e);
1612+
recordAfterRollback(iterator, record, decorateException(e));
16111613
}
16121614
finally {
16131615
if (this.producerPerConsumerPartition) {
@@ -1838,8 +1840,8 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
18381840
}
18391841
}
18401842

1841-
private Exception decorateException(RuntimeException e) {
1842-
Exception toHandle = e;
1843+
private RuntimeException decorateException(RuntimeException e) {
1844+
RuntimeException toHandle = e;
18431845
if (toHandle instanceof ListenerExecutionFailedException) {
18441846
toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId,
18451847
toHandle.getCause());

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
575575
public void testMaxFailures() throws Exception {
576576
logger.info("Start testMaxFailures");
577577
Map<String, Object> props = KafkaTestUtils.consumerProps("txTestMaxFailures", "false", embeddedKafka);
578-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
578+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupInARBP");
579579
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
580580
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
581581
ContainerProperties containerProps = new ContainerProperties(topic3);
@@ -650,9 +650,10 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
650650
Map<String, Object> map = new HashMap<>();
651651
mapper.toHeaders(dltRecord.headers(), map);
652652
MessageHeaders headers = new MessageHeaders(map);
653-
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_FQCN, byte[].class))).contains("RuntimeException");
653+
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_FQCN, byte[].class)))
654+
.contains("ListenerExecutionFailedException");
654655
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))
655-
.isEqualTo("fail for max failures".getBytes());
656+
.contains("fail for max failures".getBytes());
656657
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull();
657658
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[3]).isEqualTo((byte) 0);
658659
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)[3]).isEqualTo((byte) 0);
@@ -663,7 +664,11 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
663664
pf.destroy();
664665
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
665666
verify(afterRollbackProcessor, times(4)).isProcessInTransaction();
666-
verify(afterRollbackProcessor, times(4)).process(any(), any(), any(), anyBoolean());
667+
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
668+
verify(afterRollbackProcessor, times(4)).process(any(), any(), captor.capture(), anyBoolean());
669+
assertThat(captor.getValue()).isInstanceOf(ListenerExecutionFailedException.class)
670+
.extracting(ex -> ((ListenerExecutionFailedException) ex).getGroupId())
671+
.isEqualTo("groupInARBP");
667672
verify(afterRollbackProcessor).clearThreadState();
668673
verify(dlTemplate).send(any(ProducerRecord.class));
669674
verify(dlTemplate).sendOffsetsToTransaction(

0 commit comments

Comments
 (0)