Skip to content

Commit 0d10899

Browse files
codebase/synchronous-communication-with-apache-kafka-using-replyingkafkatemplate [BAEL-6137] (#18405)
* add codebase * add test case * add missing dependency * minor updates * add test scope to test dependency * remove test application * remove reply topic name in @sendto * remove @EnableKafka * upgrade kafka docker version * add validation on kafka configuration properties * specify package explicitly in trusted packages * upgrade spring-boot version * add controller for local testing
1 parent 6d0668c commit 0d10899

13 files changed

+274
-11
lines changed

spring-kafka-4/pom.xml

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,26 @@
1717
<dependencies>
1818
<dependency>
1919
<groupId>org.springframework.boot</groupId>
20-
<artifactId>spring-boot-starter</artifactId>
20+
<artifactId>spring-boot-starter-web</artifactId>
2121
</dependency>
2222
<dependency>
2323
<groupId>org.springframework.boot</groupId>
24-
<artifactId>spring-boot-starter-web</artifactId>
24+
<artifactId>spring-boot-starter-validation</artifactId>
2525
</dependency>
2626
<dependency>
2727
<groupId>org.springframework.kafka</groupId>
2828
<artifactId>spring-kafka</artifactId>
29-
<version>${spring-kafka.version}</version>
3029
</dependency>
30+
31+
<!-- test dependencies -->
3132
<dependency>
3233
<groupId>org.springframework.boot</groupId>
33-
<artifactId>spring-boot-starter-webflux</artifactId>
34+
<artifactId>spring-boot-starter-test</artifactId>
35+
<scope>test</scope>
3436
</dependency>
3537
<dependency>
3638
<groupId>org.springframework.boot</groupId>
37-
<artifactId>spring-boot-starter-test</artifactId>
39+
<artifactId>spring-boot-testcontainers</artifactId>
3840
<scope>test</scope>
3941
</dependency>
4042
<dependency>
@@ -52,10 +54,6 @@
5254
<artifactId>junit-jupiter</artifactId>
5355
<scope>test</scope>
5456
</dependency>
55-
<dependency>
56-
<groupId>com.fasterxml.jackson.core</groupId>
57-
<artifactId>jackson-databind</artifactId>
58-
</dependency>
5957
</dependencies>
6058

6159
<build>
@@ -72,8 +70,7 @@
7270

7371
<properties>
7472
<java.version>21</java.version>
75-
<spring-kafka.version>3.1.2</spring-kafka.version>
76-
<spring-boot.version>3.2.2</spring-boot.version>
73+
<spring-boot.version>3.4.4</spring-boot.version>
7774
</properties>
7875

7976
</project>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
import org.springframework.context.annotation.PropertySource;
6+
7+
@SpringBootApplication
8+
@PropertySource("classpath:application-synchronous-kafka.properties")
9+
class Application {
10+
11+
public static void main(String[] args) {
12+
SpringApplication.run(Application.class, args);
13+
}
14+
15+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
7+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
8+
import org.springframework.kafka.core.KafkaTemplate;
9+
import org.springframework.kafka.core.ConsumerFactory;
10+
import org.springframework.kafka.core.ProducerFactory;
11+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
12+
import org.springframework.kafka.listener.ContainerProperties;
13+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
14+
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
15+
16+
import java.time.Duration;
17+
18+
@Configuration
19+
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
20+
class KafkaConfiguration {
21+
22+
private final SynchronousKafkaProperties synchronousKafkaProperties;
23+
24+
KafkaConfiguration(SynchronousKafkaProperties synchronousKafkaProperties) {
25+
this.synchronousKafkaProperties = synchronousKafkaProperties;
26+
}
27+
28+
@Bean
29+
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(
30+
ConsumerFactory<String, NotificationDispatchResponse> consumerFactory
31+
) {
32+
String replyTopic = synchronousKafkaProperties.replyTopic();
33+
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
34+
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
35+
}
36+
37+
@Bean
38+
ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(
39+
ProducerFactory<String, NotificationDispatchRequest> producerFactory,
40+
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer
41+
) {
42+
Duration replyTimeout = synchronousKafkaProperties.replyTimeout();
43+
var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
44+
replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);
45+
return replyingKafkaTemplate;
46+
}
47+
48+
@Bean
49+
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {
50+
return new KafkaTemplate<>(producerFactory);
51+
}
52+
53+
@Bean
54+
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(
55+
ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,
56+
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate
57+
) {
58+
var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();
59+
factory.setConsumerFactory(consumerFactory);
60+
factory.setReplyTemplate(kafkaTemplate);
61+
return factory;
62+
}
63+
64+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import org.springframework.http.ResponseEntity;
4+
import org.springframework.web.bind.annotation.PostMapping;
5+
import org.springframework.web.bind.annotation.RequestBody;
6+
import org.springframework.web.bind.annotation.RequestMapping;
7+
import org.springframework.web.bind.annotation.RestController;
8+
9+
import java.util.concurrent.ExecutionException;
10+
11+
@RestController
12+
@RequestMapping("/api/v1")
13+
class NotificationDispatchController {
14+
15+
private final NotificationDispatchService notificationDispatchService;
16+
17+
NotificationDispatchController(NotificationDispatchService notificationDispatchService) {
18+
this.notificationDispatchService = notificationDispatchService;
19+
}
20+
21+
@PostMapping(value = "/notification")
22+
ResponseEntity<NotificationDispatchResponse> dispatch(
23+
@RequestBody NotificationDispatchRequest notificationDispatchRequest
24+
) throws ExecutionException, InterruptedException {
25+
NotificationDispatchResponse response = notificationDispatchService.dispatch(notificationDispatchRequest);
26+
return ResponseEntity.ok(response);
27+
}
28+
29+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import org.springframework.kafka.annotation.KafkaListener;
4+
import org.springframework.messaging.handler.annotation.SendTo;
5+
import org.springframework.stereotype.Component;
6+
7+
import java.util.UUID;
8+
9+
@Component
10+
class NotificationDispatchListener {
11+
12+
@SendTo
13+
@KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")
14+
NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {
15+
// ... processing logic
16+
UUID notificationId = UUID.randomUUID();
17+
return new NotificationDispatchResponse(notificationId);
18+
}
19+
20+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
record NotificationDispatchRequest(String emailId, String content) {
4+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import java.util.UUID;
4+
5+
public record NotificationDispatchResponse(UUID notificationId) {
6+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import org.apache.kafka.clients.producer.ProducerRecord;
4+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
5+
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
6+
import org.springframework.stereotype.Service;
7+
8+
import java.util.concurrent.ExecutionException;
9+
10+
@Service
11+
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
12+
class NotificationDispatchService {
13+
14+
private final SynchronousKafkaProperties synchronousKafkaProperties;
15+
private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;
16+
17+
NotificationDispatchService(SynchronousKafkaProperties synchronousKafkaProperties, ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate) {
18+
this.synchronousKafkaProperties = synchronousKafkaProperties;
19+
this.replyingKafkaTemplate = replyingKafkaTemplate;
20+
}
21+
22+
NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) throws ExecutionException, InterruptedException {
23+
String requestTopic = synchronousKafkaProperties.requestTopic();
24+
ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);
25+
26+
var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
27+
return requestReplyFuture.get().value();
28+
}
29+
30+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import jakarta.validation.constraints.NotBlank;
4+
import jakarta.validation.constraints.NotNull;
5+
import org.hibernate.validator.constraints.time.DurationMax;
6+
import org.hibernate.validator.constraints.time.DurationMin;
7+
import org.springframework.boot.context.properties.ConfigurationProperties;
8+
import org.springframework.validation.annotation.Validated;
9+
10+
import java.time.Duration;
11+
12+
@Validated
13+
@ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous")
14+
record SynchronousKafkaProperties(
15+
@NotBlank
16+
String requestTopic,
17+
18+
@NotBlank
19+
String replyTopic,
20+
21+
@NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)
22+
Duration replyTimeout
23+
) {
24+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS}
2+
3+
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
4+
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
5+
spring.kafka.consumer.group-id=synchronous-kafka-group
6+
spring.kafka.consumer.properties.spring.json.trusted.packages=com.baeldung.kafka.synchronous
7+
8+
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
9+
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
10+
11+
spring.kafka.properties.allow.auto.create.topics=true
12+
13+
com.baeldung.kafka.synchronous.request-topic=notification-dispatch-request
14+
com.baeldung.kafka.synchronous.reply-topic=notification-dispatch-response
15+
com.baeldung.kafka.synchronous.reply-timeout=30s
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.boot.test.context.SpringBootTest;
6+
import org.springframework.context.annotation.Import;
7+
8+
import java.util.concurrent.ExecutionException;
9+
10+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
11+
12+
@SpringBootTest
13+
@Import(TestcontainersConfiguration.class)
14+
class SynchronousKafkaLiveTest {
15+
16+
@Autowired
17+
private NotificationDispatchService notificationDispatchService;
18+
19+
@Test
20+
void whenNotificationRequestSent_thenReplyReceived() throws ExecutionException, InterruptedException {
21+
NotificationDispatchRequest request = new NotificationDispatchRequest("[email protected]", "test-content");
22+
23+
NotificationDispatchResponse response = notificationDispatchService.dispatch(request);
24+
25+
assertThat(response.notificationId())
26+
.isNotNull();
27+
}
28+
29+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import org.springframework.boot.SpringApplication;
4+
5+
class TestApplication {
6+
7+
public static void main(String[] args) {
8+
SpringApplication.from(Application::main)
9+
.with(TestcontainersConfiguration.class)
10+
.run(args);
11+
}
12+
13+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.baeldung.kafka.synchronous;
2+
3+
import org.springframework.boot.test.context.TestConfiguration;
4+
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
5+
import org.springframework.context.annotation.Bean;
6+
import org.testcontainers.kafka.KafkaContainer;
7+
8+
@TestConfiguration(proxyBeanMethods = false)
9+
class TestcontainersConfiguration {
10+
11+
@Bean
12+
@ServiceConnection
13+
KafkaContainer kafkaContainer() {
14+
return new KafkaContainer("apache/kafka:4.0.0");
15+
}
16+
17+
}

0 commit comments

Comments
 (0)