Skip to content

Commit eb77967

Browse files
authored
Private Header Type for DeserializationExceptions
Use a package-private header for deserialization exceptions. **cherry-pick to 2.9.x**
1 parent edecea0 commit eb77967

File tree

12 files changed

+267
-45
lines changed

12 files changed

+267
-45
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4671,10 +4671,15 @@ void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<M
46714671
Thing thing = in.get(i);
46724672
if (thing == null
46734673
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
4674-
DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger,
4675-
(byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
4676-
if (deserEx != null) {
4677-
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
4674+
try {
4675+
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
4676+
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
4677+
if (deserEx != null) {
4678+
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
4679+
}
4680+
}
4681+
catch (Exception ex) {
4682+
logger.error(ex, "Record at index " + i + " could not be deserialized");
46784683
}
46794684
throw new BatchListenerFailedException("Deserialization", deserEx, i);
46804685
}
@@ -4684,9 +4689,9 @@ void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<M
46844689
----
46854690
====
46864691

4687-
`ListenerUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`.
4692+
`SerializationUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`.
46884693

4689-
When consuming `List<ConsumerRecord<?, ?>`, `ListenerUtils.getExceptionFromHeader()` is used instead:
4694+
When consuming `List<ConsumerRecord<?, ?>`, `SerializationUtils.getExceptionFromHeader()` is used instead:
46904695

46914696
====
46924697
[source, java]
@@ -4696,7 +4701,7 @@ void listen(List<ConsumerRecord<String, Thing>> in) {
46964701
for (int i = 0; i < in.size(); i++) {
46974702
ConsumerRecord<String, Thing> rec = in.get(i);
46984703
if (rec.value() == null) {
4699-
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec,
4704+
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
47004705
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
47014706
if (deserEx != null) {
47024707
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,9 +506,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
506506
if (consumer != null && this.verifyPartition) {
507507
tp = checkPartition(tp, consumer);
508508
}
509-
DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record,
509+
DeserializationException vDeserEx = SerializationUtils.getExceptionFromHeader(record,
510510
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
511-
DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record,
511+
DeserializationException kDeserEx = SerializationUtils.getExceptionFromHeader(record,
512512
SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
513513
Headers headers = new RecordHeaders(record.headers().toArray());
514514
addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2985,7 +2985,8 @@ private void fixStackTrace(Exception ex, Exception toHandle) {
29852985
}
29862986

29872987
public void checkDeser(final ConsumerRecord<K, V> cRecord, String headerName) {
2988-
DeserializationException exception = ListenerUtils.getExceptionFromHeader(cRecord, headerName, this.logger);
2988+
DeserializationException exception = SerializationUtils.getExceptionFromHeader(cRecord, headerName,
2989+
this.logger);
29892990
if (exception != null) {
29902991
/*
29912992
* Wrapping in a LEFE is not strictly correct, but required for backwards compatibility.

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,10 @@
2424

2525
import org.apache.kafka.clients.consumer.ConsumerRecord;
2626
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
27-
import org.apache.kafka.common.header.Header;
28-
import org.apache.kafka.common.header.Headers;
29-
import org.apache.kafka.common.header.internals.RecordHeaders;
3027

3128
import org.springframework.core.log.LogAccessor;
3229
import org.springframework.kafka.support.serializer.DeserializationException;
30+
import org.springframework.kafka.support.serializer.SerializationUtils;
3331
import org.springframework.lang.Nullable;
3432
import org.springframework.util.Assert;
3533
import org.springframework.util.backoff.BackOff;
@@ -92,23 +90,15 @@ else if (listener instanceof GenericMessageListener) {
9290
* @param logger the logger for logging errors.
9391
* @return the exception or null.
9492
* @since 2.3
93+
* @deprecated in favor of
94+
* {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}.
9595
*/
96+
@Deprecated
9697
@Nullable
9798
public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record,
9899
String headerName, LogAccessor logger) {
99100

100-
Header header = record.headers().lastHeader(headerName);
101-
if (header != null) {
102-
byte[] value = header.value();
103-
DeserializationException exception = byteArrayToDeserializationException(logger, value);
104-
if (exception != null) {
105-
Headers headers = new RecordHeaders(record.headers().toArray());
106-
headers.remove(headerName);
107-
exception.setHeaders(headers);
108-
}
109-
return exception;
110-
}
111-
return null;
101+
return SerializationUtils.getExceptionFromHeader(record, headerName, logger);
112102
}
113103

114104
/**
@@ -118,7 +108,11 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco
118108
* @param value the bytes.
119109
* @return the exception or null if deserialization fails.
120110
* @since 2.8.1
111+
* @deprecated in favor of
112+
* {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} or
113+
* {@link SerializationUtils#byteArrayToDeserializationException(LogAccessor, org.apache.kafka.common.header.Header)}.
121114
*/
115+
@Deprecated
122116
@Nullable
123117
public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, byte[] value) {
124118
try {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,7 +48,6 @@
4848
import org.springframework.kafka.listener.ConsumerSeekAware;
4949
import org.springframework.kafka.listener.ContainerProperties;
5050
import org.springframework.kafka.listener.GenericMessageListenerContainer;
51-
import org.springframework.kafka.listener.ListenerUtils;
5251
import org.springframework.kafka.support.KafkaHeaders;
5352
import org.springframework.kafka.support.KafkaUtils;
5453
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -558,7 +557,7 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) {
558557
* Return a {@link DeserializationException} if either the key or value failed
559558
* deserialization; null otherwise. If you need to determine whether it was the key or
560559
* value, call
561-
* {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
560+
* {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
562561
* with {@link SerializationUtils#KEY_DESERIALIZER_EXCEPTION_HEADER} and
563562
* {@link SerializationUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
564563
* @param record the record.
@@ -568,14 +567,14 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) {
568567
*/
569568
@Nullable
570569
public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
571-
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record,
570+
DeserializationException exception = SerializationUtils.getExceptionFromHeader(record,
572571
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
573572
if (exception != null) {
574573
logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-"
575574
+ record.partition() + "@" + record.offset());
576575
return exception;
577576
}
578-
exception = ListenerUtils.getExceptionFromHeader(record,
577+
exception = SerializationUtils.getExceptionFromHeader(record,
579578
SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
580579
if (exception != null) {
581580
logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import org.apache.kafka.common.header.internals.RecordHeader;
20+
21+
/**
22+
* A package-protected header used to contain serialized
23+
* {@link DeserializationException}s. Only headers of this type will be examined for
24+
* deserialization.
25+
*
26+
* @author Gary Russell
27+
* @since 2.9.11
28+
*/
29+
class DeserializationExceptionHeader extends RecordHeader {
30+
31+
/**
32+
* Construct an instance with the provided properties.
33+
* @param key the key.
34+
* @param value the value;
35+
*/
36+
DeserializationExceptionHeader(String key, byte[] value) {
37+
super(key, value);
38+
}
39+
40+
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,16 +16,24 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.io.ByteArrayInputStream;
1920
import java.io.ByteArrayOutputStream;
2021
import java.io.IOException;
22+
import java.io.ObjectInputStream;
2123
import java.io.ObjectOutputStream;
24+
import java.io.ObjectStreamClass;
2225
import java.lang.reflect.InvocationTargetException;
2326
import java.lang.reflect.Method;
2427
import java.util.function.BiFunction;
2528

29+
import org.apache.kafka.clients.consumer.ConsumerRecord;
30+
import org.apache.kafka.common.header.Header;
2631
import org.apache.kafka.common.header.Headers;
27-
import org.apache.kafka.common.header.internals.RecordHeader;
32+
import org.apache.kafka.common.header.internals.RecordHeaders;
2833

34+
import org.springframework.core.log.LogAccessor;
35+
import org.springframework.kafka.support.KafkaUtils;
36+
import org.springframework.lang.Nullable;
2937
import org.springframework.util.Assert;
3038
import org.springframework.util.ClassUtils;
3139

@@ -166,10 +174,82 @@ data, isForKeyArg, new RuntimeException("Could not serialize type "
166174
}
167175
}
168176
headers.add(
169-
new RecordHeader(isForKeyArg
177+
new DeserializationExceptionHeader(isForKeyArg
170178
? KEY_DESERIALIZER_EXCEPTION_HEADER
171179
: VALUE_DESERIALIZER_EXCEPTION_HEADER,
172180
stream.toByteArray()));
173181
}
174182

183+
/**
184+
* Extract a {@link DeserializationException} from the supplied header name, if
185+
* present.
186+
* @param record the consumer record.
187+
* @param headerName the header name.
188+
* @param logger the logger for logging errors.
189+
* @return the exception or null.
190+
* @since 2.9.11
191+
*/
192+
@Nullable
193+
public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record,
194+
String headerName, LogAccessor logger) {
195+
196+
Header header = record.headers().lastHeader(headerName);
197+
if (!(header instanceof DeserializationExceptionHeader)) {
198+
logger.warn(
199+
() -> String.format("Foreign deserialization exception header in (%s) ignored; possible attack?",
200+
KafkaUtils.format(record)));
201+
return null;
202+
}
203+
if (header != null) {
204+
byte[] value = header.value();
205+
DeserializationException exception = byteArrayToDeserializationException(logger, header);
206+
if (exception != null) {
207+
Headers headers = new RecordHeaders(record.headers().toArray());
208+
headers.remove(headerName);
209+
exception.setHeaders(headers);
210+
}
211+
return exception;
212+
}
213+
return null;
214+
}
215+
216+
/**
217+
* Convert a byte array containing a serialized {@link DeserializationException} to the
218+
* {@link DeserializationException}.
219+
* @param logger a log accessor to log errors.
220+
* @param header the header.
221+
* @return the exception or null if deserialization fails.
222+
* @since 2.9.11
223+
*/
224+
@Nullable
225+
public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, Header header) {
226+
227+
if (!(header instanceof DeserializationExceptionHeader)) {
228+
throw new IllegalStateException("Foreign deserialization exception header ignored; possible attack?");
229+
}
230+
try {
231+
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(header.value())) {
232+
233+
boolean first = true;
234+
235+
@Override
236+
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
237+
if (this.first) {
238+
this.first = false;
239+
Assert.state(desc.getName().equals(DeserializationException.class.getName()),
240+
"Header does not contain a DeserializationException");
241+
}
242+
return super.resolveClass(desc);
243+
}
244+
245+
246+
};
247+
return (DeserializationException) ois.readObject();
248+
}
249+
catch (IOException | ClassNotFoundException | ClassCastException e) {
250+
logger.error(e, "Failed to deserialize a deserialization exception");
251+
return null;
252+
}
253+
}
254+
175255
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.springframework.kafka.support.SendResult;
7474
import org.springframework.kafka.support.converter.ConversionException;
7575
import org.springframework.kafka.support.serializer.DeserializationException;
76+
import org.springframework.kafka.support.serializer.SerializationTestUtils;
7677
import org.springframework.kafka.support.serializer.SerializationUtils;
7778
import org.springframework.kafka.test.utils.KafkaTestUtils;
7879

@@ -172,8 +173,10 @@ void valueHeaderStripped() {
172173
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
173174
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
174175
Headers headers = new RecordHeaders();
175-
headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false)));
176-
headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true)));
176+
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
177+
header(false)));
178+
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
179+
header(true)));
177180
Headers custom = new RecordHeaders();
178181
custom.add(new RecordHeader("foo", "bar".getBytes()));
179182
recoverer.setHeadersFunction((rec, ex) -> custom);
@@ -202,7 +205,8 @@ void keyHeaderStripped() {
202205
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
203206
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
204207
Headers headers = new RecordHeaders();
205-
headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true)));
208+
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
209+
header(true)));
206210
CompletableFuture future = new CompletableFuture();
207211
future.complete(new Object());
208212
willReturn(future).given(template).send(any(ProducerRecord.class));
@@ -222,8 +226,8 @@ void keyDeserOnly() {
222226
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
223227
Headers headers = new RecordHeaders();
224228
DeserializationException deserEx = createDeserEx(true);
225-
headers.add(
226-
new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true, deserEx)));
229+
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
230+
header(true, deserEx)));
227231
CompletableFuture future = new CompletableFuture();
228232
future.complete(new Object());
229233
willReturn(future).given(template).send(any(ProducerRecord.class));
@@ -245,8 +249,10 @@ void headersNotStripped() {
245249
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
246250
recoverer.setRetainExceptionHeader(true);
247251
Headers headers = new RecordHeaders();
248-
headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false)));
249-
headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true)));
252+
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
253+
header(false)));
254+
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
255+
header(true)));
250256
CompletableFuture future = new CompletableFuture();
251257
future.complete(new Object());
252258
willReturn(future).given(template).send(any(ProducerRecord.class));

spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -129,8 +129,8 @@ public String deserialize(String topic, Headers headers, byte[] data) {
129129
ErrorHandlingDeserializer<String> ehd = new ErrorHandlingDeserializer<>(new MyDes());
130130
Headers headers = new RecordHeaders();
131131
ehd.deserialize("foo", headers, new byte[1]);
132-
DeserializationException dex = ListenerUtils.byteArrayToDeserializationException(null,
133-
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER).value());
132+
DeserializationException dex = SerializationUtils.byteArrayToDeserializationException(null,
133+
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
134134
assertThat(dex.getCause().getMessage())
135135
.contains("Could not serialize")
136136
.contains("original exception message");

0 commit comments

Comments
 (0)