Skip to content

Commit 3443e20

Browse files
garyrussellartembilan
authored andcommitted
GH-2779: ParseStringDeserializer: Add Null Check
Resolves #2779 Handle tombstone records. **cherry-pick to 2.9.x** * Delegate null handling to the supplied function. (cherry picked from commit 701ed82)
1 parent e1e9d10 commit 3443e20

File tree

2 files changed

+19
-6
lines changed

2 files changed

+19
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ParseStringDeserializer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,15 +67,17 @@ public ParseStringDeserializer() {
6767
}
6868

6969
/**
70-
* Construct an instance with the supplied parser function.
70+
* Construct an instance with the supplied parser function. The function may receive
71+
* null as the input value, for example for a tombstone record in a compacted topic.
7172
* @param parser the function.
7273
*/
7374
public ParseStringDeserializer(Function<String, T> parser) {
7475
this.parser = (message, ignoredHeaders) -> parser.apply(message);
7576
}
7677

7778
/**
78-
* Construct an instance with the supplied parser function.
79+
* Construct an instance with the supplied parser function. The function may receive
80+
* null as the input value, for example for a tombstone record in a compacted topic.
7981
* @param parser the function.
8082
*/
8183
public ParseStringDeserializer(BiFunction<String, Headers, T> parser) {
@@ -100,7 +102,7 @@ public T deserialize(String topic, byte[] data) {
100102

101103
@Override
102104
public T deserialize(String topic, Headers headers, byte[] data) {
103-
return this.parser.apply(new String(data, this.charset), headers);
105+
return this.parser.apply(data == null ? null : new String(data, this.charset), headers);
104106
}
105107

106108
/**

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/ToStringSerializationTests.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.junit.jupiter.api.Test;
3030

3131
import org.springframework.kafka.support.serializer.testentities.DummyEntity;
32+
import org.springframework.lang.Nullable;
3233

3334
/**
3435
*
@@ -212,6 +213,13 @@ public void testSerialization_usingHeaders() {
212213
.hasFieldOrPropertyWithValue("intValue", 123);
213214
}
214215

216+
@Test
217+
void nullValue() {
218+
ParseStringDeserializer<Object> deserializer =
219+
new ParseStringDeserializer<>(ToStringSerializationTests::parseWithHeaders);
220+
assertThat(deserializer.deserialize("foo", new RecordHeaders(), null)).isNull();
221+
}
222+
215223
@Test
216224
@DisplayName("Test deserialization using headers via config")
217225
public void testSerialization_usingHeadersViaConfig() {
@@ -272,7 +280,10 @@ public void testSerializationDeserializationWithCharset() {
272280
.isNotEqualTo("tôtô");
273281
}
274282

275-
public static Object parseWithHeaders(String str, Headers headers) {
283+
public static Object parseWithHeaders(@Nullable String str, Headers headers) {
284+
if (str == null) {
285+
return null;
286+
}
276287
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
277288
String entityType = new String(header);
278289

0 commit comments

Comments
 (0)