Skip to content

Commit 7dfac08

Browse files
committed
jspecify nullability changes in support.converter package
#3762 Signed-off-by: Soby Chacko <[email protected]>
1 parent 23eed85 commit 7dfac08

11 files changed

+56
-38
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {
7979

8080
private boolean generateTimestamp = false;
8181

82-
private KafkaHeaderMapper headerMapper;
82+
private @Nullable KafkaHeaderMapper headerMapper;
8383

8484
private boolean rawRecordHeader;
8585

@@ -232,15 +232,17 @@ private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object>
232232
}
233233
}
234234

235-
private Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
235+
private @Nullable Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
236236
return this.recordConverter == null || !containerType(type)
237237
? extractAndConvertValue(record, type)
238238
: convert(record, type, conversionFailures);
239239
}
240240

241241
private Map<String, Object> convertHeaders(Headers headers, List<Map<String, Object>> convertedHeaders) {
242242
Map<String, Object> converted = new HashMap<>();
243-
this.headerMapper.toHeaders(headers, converted);
243+
if (this.headerMapper != null) {
244+
this.headerMapper.toHeaders(headers, converted);
245+
}
244246
convertedHeaders.add(converted);
245247
return converted;
246248
}
@@ -269,12 +271,17 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
269271
* @param conversionFailures Conversion failures.
270272
* @return the converted payload.
271273
*/
272-
protected Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
274+
protected @Nullable Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
273275
try {
274-
Object payload = this.recordConverter
275-
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
276-
conversionFailures.add(null);
277-
return payload;
276+
if (this.recordConverter != null) {
277+
Object payload = this.recordConverter
278+
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
279+
conversionFailures.add(null);
280+
return payload;
281+
}
282+
else {
283+
return null;
284+
}
278285
}
279286
catch (ConversionException ex) {
280287
byte[] original = null;

spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2024 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import com.fasterxml.jackson.core.JsonProcessingException;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import org.jspecify.annotations.Nullable;
2122

2223
import org.springframework.kafka.support.KafkaNull;
2324
import org.springframework.messaging.Message;
@@ -44,7 +45,7 @@ public ByteArrayJsonMessageConverter(ObjectMapper objectMapper) {
4445
}
4546

4647
@Override
47-
protected Object convertPayload(Message<?> message) {
48+
protected @Nullable Object convertPayload(Message<?> message) {
4849
try {
4950
return message.getPayload() instanceof KafkaNull
5051
? null

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import com.fasterxml.jackson.core.JsonProcessingException;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import org.apache.kafka.common.utils.Bytes;
22+
import org.jspecify.annotations.Nullable;
2223

2324
import org.springframework.kafka.support.KafkaNull;
2425
import org.springframework.messaging.Message;
@@ -45,7 +46,7 @@ public BytesJsonMessageConverter(ObjectMapper objectMapper) {
4546
}
4647

4748
@Override
48-
protected Object convertPayload(Message<?> message) {
49+
protected @Nullable Object convertPayload(Message<?> message) {
4950
try {
5051
return message.getPayload() instanceof KafkaNull
5152
? null

spring-kafka/src/main/java/org/springframework/kafka/support/converter/ConversionException.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,18 @@
3535
@SuppressWarnings("serial")
3636
public class ConversionException extends KafkaException {
3737

38-
private transient ConsumerRecord<?, ?> record;
38+
private transient @Nullable ConsumerRecord<?, ?> record;
3939

40-
private transient List<ConsumerRecord<?, ?>> records = new ArrayList<>();
40+
private transient @Nullable List<ConsumerRecord<?, ?>> records = new ArrayList<>();
4141

42-
private transient Message<?> message;
42+
private transient @Nullable Message<?> message;
4343

4444
/**
4545
* Construct an instance with the provided properties.
4646
* @param message A text message describing the reason.
4747
* @param cause the cause.
4848
*/
49-
public ConversionException(String message, Throwable cause) {
49+
public ConversionException(String message, @Nullable Throwable cause) {
5050
super(message, cause);
5151
this.record = null;
5252
this.message = null;
@@ -75,7 +75,9 @@ public ConversionException(String message, ConsumerRecord<?, ?> record, Throwabl
7575
public ConversionException(String message, List<ConsumerRecord<?, ?>> records, Throwable cause) {
7676
super(message, cause);
7777
this.record = null;
78-
this.records.addAll(records);
78+
if (this.records != null) {
79+
this.records.addAll(records);
80+
}
7981
this.message = null;
8082
}
8183

spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2024 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.common.header.Headers;
2727
import org.apache.kafka.common.header.internals.RecordHeaders;
2828
import org.apache.kafka.common.utils.Bytes;
29+
import org.jspecify.annotations.Nullable;
2930

3031
import org.springframework.kafka.support.JacksonUtils;
3132
import org.springframework.kafka.support.KafkaNull;
@@ -91,13 +92,13 @@ protected Headers initialRecordHeaders(Message<?> message) {
9192
}
9293

9394
@Override
94-
protected Object convertPayload(Message<?> message) {
95+
protected @Nullable Object convertPayload(Message<?> message) {
9596
throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value "
9697
+ "corresponding to the configured Kafka Serializer");
9798
}
9899

99100
@Override
100-
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
101+
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, @Nullable Type type) {
101102
Object value = record.value();
102103
if (record.value() == null) {
103104
return KafkaNull.INSTANCE;
@@ -128,7 +129,7 @@ else if (value instanceof byte[]) {
128129
}
129130
}
130131

131-
private JavaType determineJavaType(ConsumerRecord<?, ?> record, Type type) {
132+
private JavaType determineJavaType(ConsumerRecord<?, ?> record, @Nullable Type type) {
132133
JavaType javaType = this.typeMapper.getTypePrecedence().equals(TypePrecedence.INFERRED) && type != null
133134
? TypeFactory.defaultInstance().constructType(type)
134135
: this.typeMapper.toJavaType(record.headers());

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ static String getGroupId() {
5656
* @param timestampType the timestamp type.
5757
* @param timestamp the timestamp.
5858
*/
59-
default void commonHeaders(Acknowledgment acknowledgment, Consumer<?, ?> consumer, Map<String, Object> rawHeaders,
59+
default void commonHeaders(@Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer, Map<String, Object> rawHeaders,
6060
@Nullable Object theKey, Object topic, Object partition, Object offset,
6161
@Nullable Object timestampType, Object timestamp) {
6262

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.common.header.Header;
3030
import org.apache.kafka.common.header.Headers;
3131
import org.apache.kafka.common.header.internals.RecordHeaders;
32+
import org.jspecify.annotations.Nullable;
3233

3334
import org.springframework.core.log.LogAccessor;
3435
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
@@ -62,7 +63,7 @@ public class MessagingMessageConverter implements RecordMessageConverter {
6263

6364
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
6465

65-
private final Function<Message<?>, Integer> partitionProvider;
66+
private final Function<Message<?>, @Nullable Integer> partitionProvider;
6667

6768
private boolean generateMessageId = false;
6869

@@ -72,7 +73,7 @@ public class MessagingMessageConverter implements RecordMessageConverter {
7273

7374
private boolean rawRecordHeader;
7475

75-
private SmartMessageConverter messagingConverter;
76+
private @Nullable SmartMessageConverter messagingConverter;
7677

7778
/**
7879
* Construct an instance that uses the {@link KafkaHeaders#PARTITION} to determine the
@@ -88,7 +89,7 @@ public MessagingMessageConverter() {
8889
* @param partitionProvider the provider.
8990
* @since 3.0.8
9091
*/
91-
public MessagingMessageConverter(Function<Message<?>, Integer> partitionProvider) {
92+
public MessagingMessageConverter(Function<Message<?>, @Nullable Integer> partitionProvider) {
9293
Assert.notNull(partitionProvider, "'partitionProvider' cannot be null");
9394
if (JacksonPresent.isJackson2Present()) {
9495
this.headerMapper = new DefaultKafkaHeaderMapper();
@@ -136,6 +137,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
136137
this.rawRecordHeader = rawRecordHeader;
137138
}
138139

140+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
139141
protected org.springframework.messaging.converter.MessageConverter getMessagingConverter() {
140142
return this.messagingConverter;
141143
}
@@ -165,16 +167,16 @@ protected org.springframework.messaging.converter.MessageConverter getMessagingC
165167
* @param messagingConverter the converter.
166168
* @since 2.7.1
167169
*/
168-
public void setMessagingConverter(SmartMessageConverter messagingConverter) {
170+
public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) {
169171
this.messagingConverter = messagingConverter;
170172
if (messagingConverter != null && this.headerMapper instanceof AbstractKafkaHeaderMapper) {
171173
((AbstractKafkaHeaderMapper) this.headerMapper).addRawMappedHeader(MessageHeaders.CONTENT_TYPE, true);
172174
}
173175
}
174176

175177
@Override
176-
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
177-
Type type) {
178+
public Message<?> toMessage(ConsumerRecord<?, ?> record, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer,
179+
@Nullable Type type) {
178180

179181
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
180182
this.generateTimestamp);
@@ -272,7 +274,7 @@ protected Headers initialRecordHeaders(Message<?> message) {
272274
* @param message the message.
273275
* @return the payload.
274276
*/
275-
protected Object convertPayload(Message<?> message) {
277+
protected @Nullable Object convertPayload(Message<?> message) {
276278
Object payload = message.getPayload();
277279
if (payload instanceof KafkaNull) {
278280
return null;
@@ -289,7 +291,7 @@ protected Object convertPayload(Message<?> message) {
289291
* @param type the required type.
290292
* @return the value.
291293
*/
292-
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
294+
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, @Nullable Type type) {
293295
return record.value() == null ? KafkaNull.INSTANCE : record.value();
294296
}
295297

spring-kafka/src/main/java/org/springframework/kafka/support/converter/ProjectingMessageConverter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
2626
import org.apache.kafka.clients.consumer.ConsumerRecord;
2727
import org.apache.kafka.common.utils.Bytes;
28+
import org.jspecify.annotations.Nullable;
2829

2930
import org.springframework.core.ResolvableType;
3031
import org.springframework.data.projection.MethodInterceptorFactory;
@@ -99,12 +100,12 @@ public ProjectingMessageConverter(ObjectMapper mapper, MessagingMessageConverter
99100
}
100101

101102
@Override
102-
protected Object convertPayload(Message<?> message) {
103+
protected @Nullable Object convertPayload(Message<?> message) {
103104
return this.delegate.convertPayload(message);
104105
}
105106

106107
@Override
107-
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
108+
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, @Nullable Type type) {
108109
Object value = record.value();
109110

110111
if (value == null) {

spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323
import org.apache.kafka.clients.producer.ProducerRecord;
2424
import org.jspecify.annotations.NonNull;
25+
import org.jspecify.annotations.Nullable;
2526

2627
import org.springframework.kafka.support.Acknowledgment;
2728
import org.springframework.messaging.Message;
@@ -43,8 +44,8 @@ public interface RecordMessageConverter extends MessageConverter {
4344
* @return the message.
4445
*/
4546
@NonNull
46-
Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
47-
Type payloadType);
47+
Message<?> toMessage(ConsumerRecord<?, ?> record, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer,
48+
@Nullable Type payloadType);
4849

4950
/**
5051
* Convert a message to a producer record.

spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import com.fasterxml.jackson.core.JsonProcessingException;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import org.jspecify.annotations.Nullable;
2122

2223
import org.springframework.kafka.support.KafkaNull;
2324
import org.springframework.messaging.Message;
@@ -44,7 +45,7 @@ public StringJsonMessageConverter(ObjectMapper objectMapper) {
4445
}
4546

4647
@Override
47-
protected Object convertPayload(Message<?> message) {
48+
protected @Nullable Object convertPayload(Message<?> message) {
4849
try {
4950
return message.getPayload() instanceof KafkaNull
5051
? null
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Package for kafka converters
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.kafka.support.converter;

0 commit comments

Comments
 (0)