Skip to content

Commit b902b63

Browse files
garyrussellartembilan
authored andcommitted
GH-2566: Both Converters in Container Factory
Resolves #2570 Now that both batch and record listners can be created by a single factory, it should be possible to configure both a `RecordMessageConverter` and `BatchMessageConverter`. Previously, only a single `MessageConverter` could be configured. **cherry-pick to 2.9.x** * Add doc changes. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java # spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java # spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
1 parent 2719710 commit b902b63

File tree

8 files changed

+72
-19
lines changed

8 files changed

+72
-19
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,6 +1573,9 @@ public KafkaListenerContainerFactory<?> batchFactory() {
15731573
NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation.
15741574
This, together with the changes to <<error-handlers>> allows the same factory to be used for both record and batch listeners.
15751575

1576+
NOTE: Starting with version 2.9.6, the container factory has separate setters for the `recordMessageConverter` and `batchMessageConverter` properties.
1577+
Previously, there was only one property `messageConverter` which applied to both record and batch listeners.
1578+
15761579
The following example shows how to receive a list of payloads:
15771580

15781581
====
@@ -4403,7 +4406,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
44034406
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
44044407
new ConcurrentKafkaListenerContainerFactory<>();
44054408
factory.setConsumerFactory(consumerFactory());
4406-
factory.setMessageConverter(new JsonMessageConverter());
4409+
factory.setRecordMessageConverter(new JsonMessageConverter());
44074410
return factory;
44084411
}
44094412
...
@@ -4685,7 +4688,7 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
46854688
new ConcurrentKafkaListenerContainerFactory<>();
46864689
factory.setConsumerFactory(consumerFactory());
46874690
factory.setBatchListener(true);
4688-
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
4691+
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
46894692
return factory;
46904693
}
46914694

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,7 +48,9 @@
4848
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
4949
import org.springframework.kafka.support.JavaUtils;
5050
import org.springframework.kafka.support.TopicPartitionOffset;
51+
import org.springframework.kafka.support.converter.BatchMessageConverter;
5152
import org.springframework.kafka.support.converter.MessageConverter;
53+
import org.springframework.kafka.support.converter.RecordMessageConverter;
5254
import org.springframework.retry.RecoveryCallback;
5355
import org.springframework.retry.support.RetryTemplate;
5456
import org.springframework.util.Assert;
@@ -84,7 +86,9 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
8486

8587
private Integer phase;
8688

87-
private MessageConverter messageConverter;
89+
private RecordMessageConverter recordMessageConverter;
90+
91+
private BatchMessageConverter batchMessageConverter;
8892

8993
private RecordFilterStrategy<? super K, ? super V> recordFilterStrategy;
9094

@@ -156,9 +160,38 @@ public void setPhase(int phase) {
156160
/**
157161
* Set the message converter to use if dynamic argument type matching is needed.
158162
* @param messageConverter the converter.
163+
* @deprecated since 2.9.6 in favor of
164+
* {@link #setBatchMessageConverter(BatchMessageConverter)} and
165+
* {@link #setRecordMessageConverter(RecordMessageConverter)}.
159166
*/
167+
@Deprecated
160168
public void setMessageConverter(MessageConverter messageConverter) {
161-
this.messageConverter = messageConverter;
169+
if (messageConverter instanceof RecordMessageConverter) {
170+
setRecordMessageConverter((RecordMessageConverter) messageConverter);
171+
}
172+
else {
173+
setBatchMessageConverter((BatchMessageConverter) messageConverter);
174+
}
175+
}
176+
177+
/**
178+
* Set the message converter to use if dynamic argument type matching is needed for
179+
* record listeners.
180+
* @param recordMessageConverter the converter.
181+
* @since 2.9.6
182+
*/
183+
public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
184+
this.recordMessageConverter = recordMessageConverter;
185+
}
186+
187+
/**
188+
* Set the message converter to use if dynamic argument type matching is needed for
189+
* batch listeners.
190+
* @param batchMessageConverter the converter.
191+
* @since 2.9.6
192+
*/
193+
public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
194+
this.batchMessageConverter = batchMessageConverter;
162195
}
163196

164197
/**
@@ -393,7 +426,12 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
393426
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
394427
}
395428

396-
endpoint.setupListenerContainer(instance, this.messageConverter);
429+
if (Boolean.TRUE.equals(endpoint.getBatchListener())) {
430+
endpoint.setupListenerContainer(instance, this.batchMessageConverter);
431+
}
432+
else {
433+
endpoint.setupListenerContainer(instance, this.recordMessageConverter);
434+
}
397435
initializeContainer(instance, endpoint);
398436
customizeContainer(instance);
399437
return instance;

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -274,6 +274,7 @@ public boolean isBatchListener() {
274274
* @return the batch listener flag.
275275
* @since 2.8
276276
*/
277+
@Override
277278
@Nullable
278279
public Boolean getBatchListener() {
279280
return this.batchListener;

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -155,4 +155,15 @@ default byte[] getListenerInfo() {
155155
return null;
156156
}
157157

158+
/**
159+
* Return the current batch listener flag for this endpoint, or null if not explicitly
160+
* set.
161+
* @return the batch listener flag.
162+
* @since 2.9.6
163+
*/
164+
@Nullable
165+
default Boolean getBatchListener() {
166+
return null;
167+
}
168+
158169
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -160,7 +160,7 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKa
160160
new ConcurrentKafkaListenerContainerFactory<>();
161161
factory.setConsumerFactory(consumerFactory(embeddedKafka));
162162
factory.setBatchListener(true);
163-
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
163+
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
164164
factory.setReplyTemplate(template(embeddedKafka));
165165
DefaultErrorHandler eh = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template));
166166
factory.setCommonErrorHandler(eh);

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1091,7 +1091,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
10911091
c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset());
10921092
});
10931093
factory.getContainerProperties().setMicrometerTags(Collections.singletonMap("extraTag", "foo"));
1094-
factory.setMessageConverter(new RecordMessageConverter() {
1094+
factory.setRecordMessageConverter(new RecordMessageConverter() {
10951095

10961096
@Override
10971097
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
@@ -1137,7 +1137,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
11371137
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
11381138
typeMapper.addTrustedPackages("*");
11391139
converter.setTypeMapper(typeMapper);
1140-
factory.setMessageConverter(converter);
1140+
factory.setRecordMessageConverter(converter);
11411141
return factory;
11421142
}
11431143

@@ -1154,7 +1154,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory2() {
11541154
typeMapper.addTrustedPackages("*");
11551155
typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
11561156
converter.setTypeMapper(typeMapper);
1157-
factory.setMessageConverter(converter);
1157+
factory.setRecordMessageConverter(converter);
11581158
return factory;
11591159
}
11601160

@@ -1167,7 +1167,7 @@ public KafkaListenerContainerFactory<?> projectionListenerContainerFactory() {
11671167
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
11681168
typeMapper.addTrustedPackages("*");
11691169
converter.setTypeMapper(typeMapper);
1170-
factory.setMessageConverter(new ProjectingMessageConverter(converter));
1170+
factory.setRecordMessageConverter(new ProjectingMessageConverter(converter));
11711171
return factory;
11721172
}
11731173

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -165,7 +165,7 @@ public ConsumerFactory consumerFactory() {
165165
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
166166
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
167167
factory.setConsumerFactory(consumerFactory());
168-
factory.setMessageConverter(new BatchMessagingMessageConverter(new JsonMessageConverter()));
168+
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(new JsonMessageConverter()));
169169
factory.setBatchListener(true);
170170
return factory;
171171
}

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -792,7 +792,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> simpleMapperFact
792792
factory.setReplyTemplate(template());
793793
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
794794
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
795-
factory.setMessageConverter(messageConverter);
795+
factory.setRecordMessageConverter(messageConverter);
796796
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
797797

798798
@Override

0 commit comments

Comments
 (0)