Skip to content

Commit e2cd1a1

Browse files
garyrussellartembilan
authored andcommitted
GH-1683: Improve Missing Ack Argument Exception
Resolves #1683 Error message was unclear when a non-MANUAL ack mode was used with an `Acknowledgment` parameter.
1 parent b022f9a commit e2cd1a1

File tree

2 files changed

+37
-3
lines changed

2 files changed

+37
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@
5959
import org.springframework.messaging.converter.MessageConversionException;
6060
import org.springframework.messaging.handler.annotation.Header;
6161
import org.springframework.messaging.handler.annotation.Payload;
62+
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
6263
import org.springframework.messaging.support.GenericMessage;
6364
import org.springframework.messaging.support.MessageBuilder;
6465
import org.springframework.util.Assert;
@@ -341,6 +342,16 @@ protected final Object invokeHandler(Object data, Acknowledgment acknowledgment,
341342
"be invoked with the incoming message", message.getPayload()),
342343
new MessageConversionException("Cannot handle message", ex));
343344
}
345+
catch (MethodArgumentNotValidException ex) {
346+
if (this.hasAckParameter && acknowledgment == null) {
347+
throw new ListenerExecutionFailedException("invokeHandler Failed",
348+
new IllegalStateException("No Acknowledgment available as an argument, "
349+
+ "the listener container must have a MANUAL AckMode to populate the Acknowledgment.",
350+
ex));
351+
}
352+
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
353+
"be invoked with the incoming message", message.getPayload()), ex);
354+
}
344355
catch (MessagingException ex) {
345356
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
346357
"be invoked with the incoming message", message.getPayload()), ex);

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,14 +16,19 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
1920
import static org.mockito.BDDMockito.willReturn;
2021
import static org.mockito.Mockito.mock;
2122
import static org.mockito.Mockito.verify;
2223

24+
import java.lang.reflect.Method;
25+
2326
import org.apache.kafka.clients.consumer.ConsumerRecord;
2427
import org.junit.jupiter.api.Test;
2528

29+
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
2630
import org.springframework.kafka.listener.AcknowledgingMessageListener;
31+
import org.springframework.kafka.listener.ListenerExecutionFailedException;
2732
import org.springframework.kafka.support.Acknowledgment;
2833
import org.springframework.kafka.support.converter.RecordMessageConverter;
2934
import org.springframework.messaging.support.GenericMessage;
@@ -36,7 +41,7 @@
3641
public class MessagingMessageListenerAdapterTests {
3742

3843
@Test
39-
public void testFallbackType() {
44+
void testFallbackType() {
4045
final class MyAdapter extends MessagingMessageListenerAdapter<String, String>
4146
implements AcknowledgingMessageListener<String, String> {
4247

@@ -61,4 +66,22 @@ public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknow
6166
verify(converter).toMessage(cr, ack, null, String.class);
6267
}
6368

69+
@Test
70+
void testMissingAck() throws NoSuchMethodException, SecurityException {
71+
KafkaListenerAnnotationBeanPostProcessor<String, String> bpp = new KafkaListenerAnnotationBeanPostProcessor<>();
72+
Method method = getClass().getDeclaredMethod("test", Acknowledgment.class);
73+
RecordMessagingMessageListenerAdapter<String, String> adapter =
74+
new RecordMessagingMessageListenerAdapter<>(this, method);
75+
adapter.setHandlerMethod(
76+
new HandlerAdapter(bpp.getMessageHandlerMethodFactory().createInvocableHandlerMethod(this, method)));
77+
assertThatExceptionOfType(ListenerExecutionFailedException.class).isThrownBy(() -> adapter.onMessage(
78+
new ConsumerRecord<>("foo", 0, 0L, null, "foo"), null, null))
79+
.withCauseExactlyInstanceOf(IllegalStateException.class)
80+
.withMessageContaining("MANUAL");
81+
}
82+
83+
public void test(Acknowledgment ack) {
84+
85+
}
86+
6487
}

0 commit comments

Comments
 (0)