Skip to content

Commit 083aff6

Browse files
authored
GH-2566: Both Converters in Container Factory
Resolves #2566 Now that both batch and record listners can be created by a single factory, it should be possible to configure both a `RecordMessageConverter` and `BatchMessageConverter`. Previously, only a single `MessageConverter` could be configured. **cherry-pick to 2.9.x** * Add doc changes.
1 parent 2ef00e5 commit 083aff6

File tree

8 files changed

+72
-19
lines changed

8 files changed

+72
-19
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,6 +1509,9 @@ public KafkaListenerContainerFactory<?> batchFactory() {
15091509
NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation.
15101510
This, together with the changes to <<error-handlers>> allows the same factory to be used for both record and batch listeners.
15111511

1512+
NOTE: Starting with version 2.9.6, the container factory has separate setters for the `recordMessageConverter` and `batchMessageConverter` properties.
1513+
Previously, there was only one property `messageConverter` which applied to both record and batch listeners.
1514+
15121515
The following example shows how to receive a list of payloads:
15131516

15141517
====
@@ -4395,7 +4398,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
43954398
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
43964399
new ConcurrentKafkaListenerContainerFactory<>();
43974400
factory.setConsumerFactory(consumerFactory());
4398-
factory.setMessageConverter(new JsonMessageConverter());
4401+
factory.setRecordMessageConverter(new JsonMessageConverter());
43994402
return factory;
44004403
}
44014404
...
@@ -4677,7 +4680,7 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
46774680
new ConcurrentKafkaListenerContainerFactory<>();
46784681
factory.setConsumerFactory(consumerFactory());
46794682
factory.setBatchListener(true);
4680-
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
4683+
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
46814684
return factory;
46824685
}
46834686

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,7 +47,9 @@
4747
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
4848
import org.springframework.kafka.support.JavaUtils;
4949
import org.springframework.kafka.support.TopicPartitionOffset;
50+
import org.springframework.kafka.support.converter.BatchMessageConverter;
5051
import org.springframework.kafka.support.converter.MessageConverter;
52+
import org.springframework.kafka.support.converter.RecordMessageConverter;
5153
import org.springframework.util.Assert;
5254

5355
/**
@@ -82,7 +84,9 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
8284

8385
private Integer phase;
8486

85-
private MessageConverter messageConverter;
87+
private RecordMessageConverter recordMessageConverter;
88+
89+
private BatchMessageConverter batchMessageConverter;
8690

8791
private RecordFilterStrategy<? super K, ? super V> recordFilterStrategy;
8892

@@ -154,9 +158,38 @@ public void setPhase(int phase) {
154158
/**
155159
* Set the message converter to use if dynamic argument type matching is needed.
156160
* @param messageConverter the converter.
161+
* @deprecated since 2.9.6 in favor of
162+
* {@link #setBatchMessageConverter(BatchMessageConverter)} and
163+
* {@link #setRecordMessageConverter(RecordMessageConverter)}.
157164
*/
165+
@Deprecated
158166
public void setMessageConverter(MessageConverter messageConverter) {
159-
this.messageConverter = messageConverter;
167+
if (messageConverter instanceof RecordMessageConverter) {
168+
setRecordMessageConverter((RecordMessageConverter) messageConverter);
169+
}
170+
else {
171+
setBatchMessageConverter((BatchMessageConverter) messageConverter);
172+
}
173+
}
174+
175+
/**
176+
* Set the message converter to use if dynamic argument type matching is needed for
177+
* record listeners.
178+
* @param recordMessageConverter the converter.
179+
* @since 2.9.6
180+
*/
181+
public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
182+
this.recordMessageConverter = recordMessageConverter;
183+
}
184+
185+
/**
186+
* Set the message converter to use if dynamic argument type matching is needed for
187+
* batch listeners.
188+
* @param batchMessageConverter the converter.
189+
* @since 2.9.6
190+
*/
191+
public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
192+
this.batchMessageConverter = batchMessageConverter;
160193
}
161194

162195
/**
@@ -391,7 +424,12 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
391424
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
392425
}
393426

394-
endpoint.setupListenerContainer(instance, this.messageConverter);
427+
if (Boolean.TRUE.equals(endpoint.getBatchListener())) {
428+
endpoint.setupListenerContainer(instance, this.batchMessageConverter);
429+
}
430+
else {
431+
endpoint.setupListenerContainer(instance, this.recordMessageConverter);
432+
}
395433
initializeContainer(instance, endpoint);
396434
customizeContainer(instance);
397435
return instance;

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -282,6 +282,7 @@ public boolean isBatchListener() {
282282
* @return the batch listener flag.
283283
* @since 2.8
284284
*/
285+
@Override
285286
@Nullable
286287
public Boolean getBatchListener() {
287288
return this.batchListener;

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -165,4 +165,15 @@ default String getMainListenerId() {
165165
return null;
166166
}
167167

168+
/**
169+
* Return the current batch listener flag for this endpoint, or null if not explicitly
170+
* set.
171+
* @return the batch listener flag.
172+
* @since 2.9.6
173+
*/
174+
@Nullable
175+
default Boolean getBatchListener() {
176+
return null;
177+
}
178+
168179
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -173,7 +173,7 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKa
173173
new ConcurrentKafkaListenerContainerFactory<>();
174174
factory.setConsumerFactory(consumerFactory(embeddedKafka));
175175
factory.setBatchListener(true);
176-
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
176+
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
177177
factory.setReplyTemplate(template(embeddedKafka));
178178
DefaultErrorHandler eh = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template));
179179
factory.setCommonErrorHandler(eh);

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1091,7 +1091,7 @@ public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record,
10911091
}
10921092
});
10931093
factory.getContainerProperties().setMicrometerTags(Collections.singletonMap("extraTag", "foo"));
1094-
factory.setMessageConverter(new RecordMessageConverter() {
1094+
factory.setRecordMessageConverter(new RecordMessageConverter() {
10951095

10961096
@Override
10971097
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
@@ -1137,7 +1137,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
11371137
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
11381138
typeMapper.addTrustedPackages("*");
11391139
converter.setTypeMapper(typeMapper);
1140-
factory.setMessageConverter(converter);
1140+
factory.setRecordMessageConverter(converter);
11411141
return factory;
11421142
}
11431143

@@ -1154,7 +1154,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory2() {
11541154
typeMapper.addTrustedPackages("*");
11551155
typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
11561156
converter.setTypeMapper(typeMapper);
1157-
factory.setMessageConverter(converter);
1157+
factory.setRecordMessageConverter(converter);
11581158
return factory;
11591159
}
11601160

@@ -1167,7 +1167,7 @@ public KafkaListenerContainerFactory<?> projectionListenerContainerFactory() {
11671167
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
11681168
typeMapper.addTrustedPackages("*");
11691169
converter.setTypeMapper(typeMapper);
1170-
factory.setMessageConverter(new ProjectingMessageConverter(converter));
1170+
factory.setRecordMessageConverter(new ProjectingMessageConverter(converter));
11711171
factory.setChangeConsumerThreadName(true);
11721172
factory.setThreadNameSupplier(container -> "foo." + container.getListenerId());
11731173
return factory;

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -165,7 +165,7 @@ public ConsumerFactory consumerFactory() {
165165
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
166166
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
167167
factory.setConsumerFactory(consumerFactory());
168-
factory.setMessageConverter(new BatchMessagingMessageConverter(new JsonMessageConverter()));
168+
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(new JsonMessageConverter()));
169169
factory.setBatchListener(true);
170170
return factory;
171171
}

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -900,7 +900,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> simpleMapperFact
900900
factory.setReplyTemplate(template());
901901
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
902902
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
903-
factory.setMessageConverter(messageConverter);
903+
factory.setRecordMessageConverter(messageConverter);
904904
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
905905

906906
@Override

0 commit comments

Comments
 (0)