Skip to content

Commit dae8a42

Browse files
Wzy19930507artembilan
authored andcommitted
Fix ProducerInterceptor usage in KafkaTempalte
The result of `ProducerInterceptor.onSend()` is out of use **Cherry-pick to `3.0.x`** (cherry picked from commit c5c93ad)
1 parent 0d88e26 commit dae8a42

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,7 @@ private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K,
762762
throw ex;
763763
}
764764
}
765+
765766
/**
766767
* Send the producer record.
767768
* @param producerRecord the producer record.
@@ -779,11 +780,9 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
779780
if (this.micrometerHolder != null) {
780781
sample = this.micrometerHolder.start();
781782
}
782-
if (this.producerInterceptor != null) {
783-
this.producerInterceptor.onSend(producerRecord);
784-
}
783+
ProducerRecord<K, V> interceptedRecord = interceptorProducerRecord(producerRecord);
785784
Future<RecordMetadata> sendFuture =
786-
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation));
785+
producer.send(interceptedRecord, buildCallback(interceptedRecord, producer, future, sample, observation));
787786
// Maybe an immediate failure
788787
if (sendFuture.isDone()) {
789788
try {
@@ -800,10 +799,17 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
800799
if (this.autoFlush) {
801800
flush();
802801
}
803-
this.logger.trace(() -> "Sent: " + KafkaUtils.format(producerRecord));
802+
this.logger.trace(() -> "Sent: " + KafkaUtils.format(interceptedRecord));
804803
return future;
805804
}
806805

806+
private ProducerRecord<K, V> interceptorProducerRecord(ProducerRecord<K, V> producerRecord) {
807+
if (this.producerInterceptor != null) {
808+
return this.producerInterceptor.onSend(producerRecord);
809+
}
810+
return producerRecord;
811+
}
812+
807813
private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final Producer<K, V> producer,
808814
final CompletableFuture<SendResult<K, V>> future, @Nullable Object sample, Observation observation) {
809815

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -559,19 +559,21 @@ void testFutureFailureOnSend() {
559559

560560
@SuppressWarnings("unchecked")
561561
@Test
562-
void testProducerInterceptorManagedOnKafkaTemplate() {
562+
void testProducerInterceptorManagedOnKafkaTemplate() throws Exception {
563563

564564
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
565565
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
566566
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
567567
ProducerInterceptor<Integer, String> producerInterceptor = Mockito.mock(ProducerInterceptor.class);
568+
willAnswer(inv -> new ProducerRecord<>("prod-interceptor-test-1", "bar")).given(producerInterceptor).onSend(any());
568569
template.setProducerInterceptor(producerInterceptor);
569570

570571
template.setDefaultTopic("prod-interceptor-test-1");
571-
template.sendDefault("foo");
572+
CompletableFuture<SendResult<Integer, String>> resultCompletableFuture = template.sendDefault("foo");
572573

573574
verify(producerInterceptor, times(1)).onSend(any(ProducerRecord.class));
574575
verify(producerInterceptor, times(1)).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
576+
assertThat(resultCompletableFuture.get(10, TimeUnit.SECONDS).getProducerRecord()).isEqualTo(new ProducerRecord<>("prod-interceptor-test-1", "bar"));
575577
}
576578

577579
@SuppressWarnings("unchecked")
@@ -591,13 +593,15 @@ void testProducerInterceptorNotSetOnKafkaTemplateNotInvoked() {
591593

592594
@SuppressWarnings("unchecked")
593595
@Test
594-
void testCompositeProducerInterceptor() {
596+
void testCompositeProducerInterceptor() throws Exception {
595597

596598
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
597599
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
598600
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
599601
ProducerInterceptor<Integer, String> producerInterceptor1 = Mockito.mock(ProducerInterceptor.class);
600602
ProducerInterceptor<Integer, String> producerInterceptor2 = Mockito.mock(ProducerInterceptor.class);
603+
willAnswer(inv -> new ProducerRecord<>("prod-interceptor-test-3", "bar")).given(producerInterceptor1).onSend(any());
604+
willAnswer(inv -> new ProducerRecord<>("prod-interceptor-test-3", "baz")).given(producerInterceptor2).onSend(any());
601605
CompositeProducerInterceptor<Integer, String> compositeProducerInterceptor =
602606
new CompositeProducerInterceptor<>(producerInterceptor1, producerInterceptor2);
603607
template.setProducerInterceptor(compositeProducerInterceptor);
@@ -606,14 +610,15 @@ void testCompositeProducerInterceptor() {
606610
doReturn(mockProducerRecord).when(producerInterceptor1).onSend(any(ProducerRecord.class));
607611

608612
template.setDefaultTopic("prod-interceptor-test-3");
609-
template.sendDefault("foo");
613+
CompletableFuture<SendResult<Integer, String>> result = template.sendDefault("foo");
610614

611615
InOrder inOrder = inOrder(producerInterceptor1, producerInterceptor2);
612616

613617
inOrder.verify(producerInterceptor1).onSend(any(ProducerRecord.class));
614618
inOrder.verify(producerInterceptor2).onSend(any(ProducerRecord.class));
615619
inOrder.verify(producerInterceptor1).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
616620
inOrder.verify(producerInterceptor2).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull());
621+
assertThat(result.get(10, TimeUnit.SECONDS).getProducerRecord()).isEqualTo(new ProducerRecord<>("prod-interceptor-test-3", "baz"));
617622
}
618623

619624
@ParameterizedTest(name = "{0} is invalid")

0 commit comments

Comments
 (0)