Skip to content

Commit 9f8576f

Browse files
committed
GH-1683: Refactoring
Simplify complexity. Add nullable to ack.
1 parent e2cd1a1 commit 9f8576f

File tree

4 files changed

+29
-24
lines changed

4 files changed

+29
-24
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingConsumerAwareMessageListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323

2424
import org.springframework.kafka.support.Acknowledgment;
25+
import org.springframework.lang.Nullable;
2526

2627
/**
2728
* Listener for handling a batch of incoming Kafka messages, propagating an acknowledgment
@@ -50,6 +51,6 @@ default void onMessage(List<ConsumerRecord<K, V>> data) {
5051
}
5152

5253
@Override
53-
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
54+
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer);
5455

5556
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.kafka.support.Acknowledgment;
3131
import org.springframework.kafka.support.converter.BatchMessageConverter;
3232
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
33+
import org.springframework.lang.Nullable;
3334
import org.springframework.messaging.Message;
3435
import org.springframework.messaging.support.GenericMessage;
3536
import org.springframework.messaging.support.MessageBuilder;
@@ -134,7 +135,9 @@ public void onMessage(ConsumerRecords<K, V> records, Acknowledgment acknowledgme
134135
* @param consumer the consumer.
135136
*/
136137
@Override
137-
public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
138+
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
139+
Consumer<?, ?> consumer) {
140+
138141
Message<?> message;
139142
if (!isConsumerRecordList()) {
140143
if (isMessageList() || this.batchToRecordAdapter != null) {
@@ -162,7 +165,7 @@ public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowl
162165
invoke(records, acknowledgment, consumer, message);
163166
}
164167

165-
protected void invoke(Object records, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
168+
protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
166169
final Message<?> messageArg) {
167170

168171
Message<?> message = messageArg;
@@ -196,7 +199,9 @@ protected void invoke(Object records, Acknowledgment acknowledgment, Consumer<?,
196199
}
197200

198201
@SuppressWarnings({ "unchecked", "rawtypes" })
199-
protected Message<?> toMessagingMessage(List records, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
202+
protected Message<?> toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment,
203+
Consumer<?, ?> consumer) {
204+
200205
return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType());
201206
}
202207

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
300300
}
301301
}
302302

303-
protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment,
303+
protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
304304
Consumer<?, ?> consumer) {
305305
return getMessageConverter().toMessage(record, acknowledgment, consumer, getType());
306306
}
@@ -314,7 +314,7 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgm
314314
* @param consumer the consumer.
315315
* @return the result of invocation.
316316
*/
317-
protected final Object invokeHandler(Object data, Acknowledgment acknowledgment, Message<?> message,
317+
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
318318
Consumer<?, ?> consumer) {
319319

320320
try {
@@ -332,25 +332,10 @@ protected final Object invokeHandler(Object data, Acknowledgment acknowledgment,
332332
}
333333
}
334334
catch (org.springframework.messaging.converter.MessageConversionException ex) {
335-
if (this.hasAckParameter && acknowledgment == null) {
336-
throw new ListenerExecutionFailedException("invokeHandler Failed",
337-
new IllegalStateException("No Acknowledgment available as an argument, "
338-
+ "the listener container must have a MANUAL AckMode to populate the Acknowledgment.",
339-
ex));
340-
}
341-
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
342-
"be invoked with the incoming message", message.getPayload()),
343-
new MessageConversionException("Cannot handle message", ex));
335+
throw checkAckArg(acknowledgment, message, new MessageConversionException("Cannot handle message", ex));
344336
}
345337
catch (MethodArgumentNotValidException ex) {
346-
if (this.hasAckParameter && acknowledgment == null) {
347-
throw new ListenerExecutionFailedException("invokeHandler Failed",
348-
new IllegalStateException("No Acknowledgment available as an argument, "
349-
+ "the listener container must have a MANUAL AckMode to populate the Acknowledgment.",
350-
ex));
351-
}
352-
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
353-
"be invoked with the incoming message", message.getPayload()), ex);
338+
throw checkAckArg(acknowledgment, message, ex);
354339
}
355340
catch (MessagingException ex) {
356341
throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
@@ -362,6 +347,17 @@ protected final Object invokeHandler(Object data, Acknowledgment acknowledgment,
362347
}
363348
}
364349

350+
private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Message<?> message, Exception ex) {
351+
if (this.hasAckParameter && acknowledgment == null) {
352+
return new ListenerExecutionFailedException("invokeHandler Failed",
353+
new IllegalStateException("No Acknowledgment available as an argument, "
354+
+ "the listener container must have a MANUAL AckMode to populate the Acknowledgment.",
355+
ex));
356+
}
357+
return new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " +
358+
"be invoked with the incoming message", message.getPayload()), ex);
359+
}
360+
365361
/**
366362
* Handle the given result object returned from the listener method, sending a
367363
* response message to the SendTo topic.

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
2626
import org.springframework.kafka.listener.ListenerExecutionFailedException;
2727
import org.springframework.kafka.support.Acknowledgment;
28+
import org.springframework.lang.Nullable;
2829
import org.springframework.messaging.Message;
2930
import org.springframework.messaging.support.GenericMessage;
3031

@@ -71,7 +72,9 @@ public RecordMessagingMessageListenerAdapter(Object bean, Method method, KafkaLi
7172
* @param consumer the consumer.
7273
*/
7374
@Override
74-
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
75+
public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
76+
Consumer<?, ?> consumer) {
77+
7578
Message<?> message;
7679
if (isConversionNeeded()) {
7780
message = toMessagingMessage(record, acknowledgment, consumer);

0 commit comments

Comments
 (0)