Skip to content

Commit 8faeb56

Browse files
committed
First commit
0 parents  commit 8faeb56

File tree

8 files changed

+275
-0
lines changed

8 files changed

+275
-0
lines changed

.gitignore

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/target/
2+
!.mvn/wrapper/maven-wrapper.jar
3+
4+
### STS ###
5+
.apt_generated
6+
.classpath
7+
.factorypath
8+
.project
9+
.settings
10+
.springBeans
11+
.sts4-cache
12+
13+
### IntelliJ IDEA ###
14+
.idea
15+
*.iws
16+
*.iml
17+
*.ipr
18+
19+
### NetBeans ###
20+
/nbproject/private/
21+
/build/
22+
/nbbuild/
23+
/dist/
24+
/nbdist/
25+
/.nb-gradle/

docker-compose.yml

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
version: '2'
2+
services:
3+
zookeeper:
4+
image: wurstmeister/zookeeper
5+
ports:
6+
- "2181:2181"
7+
kafka:
8+
image: wurstmeister/kafka
9+
ports:
10+
- "9092:9092"
11+
environment:
12+
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
13+
KAFKA_CREATE_TOPICS: "test:1:1"
14+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
15+
volumes:
16+
- /var/run/docker.sock:/var/run/docker.sock

pom.xml

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>io.tpd</groupId>
7+
<artifactId>kafka-example</artifactId>
8+
<version>0.0.1-SNAPSHOT</version>
9+
<packaging>jar</packaging>
10+
11+
<name>kafka-example</name>
12+
<description>Demo project for Spring Boot</description>
13+
14+
<parent>
15+
<groupId>org.springframework.boot</groupId>
16+
<artifactId>spring-boot-starter-parent</artifactId>
17+
<version>2.1.0.RELEASE</version>
18+
<relativePath/> <!-- lookup parent from repository -->
19+
</parent>
20+
21+
<properties>
22+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
23+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
24+
<java.version>10</java.version>
25+
</properties>
26+
27+
<dependencies>
28+
<dependency>
29+
<groupId>org.springframework.boot</groupId>
30+
<artifactId>spring-boot-starter-web</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>org.springframework.kafka</groupId>
34+
<artifactId>spring-kafka</artifactId>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.springframework.boot</groupId>
38+
<artifactId>spring-boot-starter-test</artifactId>
39+
<scope>test</scope>
40+
</dependency>
41+
<!--<dependency>-->
42+
<!--<groupId>io.projectreactor</groupId>-->
43+
<!--<artifactId>reactor-test</artifactId>-->
44+
<!--<scope>test</scope>-->
45+
<!--</dependency>-->
46+
</dependencies>
47+
48+
<build>
49+
<plugins>
50+
<plugin>
51+
<groupId>org.springframework.boot</groupId>
52+
<artifactId>spring-boot-maven-plugin</artifactId>
53+
</plugin>
54+
</plugins>
55+
</build>
56+
57+
58+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.tpd.kafkaexample;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.beans.factory.annotation.Autowired;
7+
import org.springframework.kafka.annotation.KafkaListener;
8+
import org.springframework.kafka.core.KafkaTemplate;
9+
import org.springframework.web.bind.annotation.GetMapping;
10+
import org.springframework.web.bind.annotation.RestController;
11+
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
14+
15+
@RestController
16+
public class DummyController {
17+
18+
private static final Logger logger = LoggerFactory.getLogger(DummyController.class);
19+
20+
@Autowired
21+
private KafkaTemplate<String, PracticalAdvice> template;
22+
23+
@GetMapping("/hello")
24+
public String hello() throws Exception {
25+
this.template.send("myTopic", new PracticalAdvice("This is a practical advice", 10));
26+
this.template.send("myTopic", new PracticalAdvice("This is a practical advice", 20));
27+
this.template.send("myTopic", new PracticalAdvice("This is a practical advice", 30));
28+
latch.await(60, TimeUnit.SECONDS);
29+
logger.info("All received");
30+
return "Hello!";
31+
}
32+
33+
private final CountDownLatch latch = new CountDownLatch(3);
34+
35+
@KafkaListener(topics = "myTopic", containerFactory = "kafkaListenerStringContainerFactory")
36+
public void listen(ConsumerRecord<String, String> cr) {
37+
logger.info(cr.toString());
38+
latch.countDown();
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package io.tpd.kafkaexample;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.clients.producer.ProducerConfig;
5+
import org.apache.kafka.common.serialization.StringDeserializer;
6+
import org.apache.kafka.common.serialization.StringSerializer;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.boot.SpringApplication;
9+
import org.springframework.boot.autoconfigure.SpringBootApplication;
10+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
13+
import org.springframework.kafka.core.*;
14+
import org.springframework.kafka.support.serializer.JsonDeserializer;
15+
import org.springframework.kafka.support.serializer.JsonSerializer;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
20+
@SpringBootApplication
21+
public class KafkaExampleApplication {
22+
23+
public static void main(String[] args) {
24+
SpringApplication.run(KafkaExampleApplication.class, args);
25+
}
26+
27+
@Autowired
28+
private KafkaProperties kafkaProperties;
29+
30+
// Configuration based on https://www.codenotfound.com/spring-kafka-json-serializer-deserializer-example.html
31+
32+
// Producer configuration
33+
34+
@Bean
35+
public Map<String, Object> producerConfigs() {
36+
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
37+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
38+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
39+
40+
return props;
41+
}
42+
43+
@Bean
44+
public ProducerFactory<String, PracticalAdvice> producerFactory() {
45+
return new DefaultKafkaProducerFactory<>(producerConfigs());
46+
}
47+
48+
@Bean
49+
public KafkaTemplate<String, PracticalAdvice> kafkaTemplate() {
50+
return new KafkaTemplate<>(producerFactory());
51+
}
52+
53+
// Consumer configuration
54+
55+
@Bean
56+
public Map<String, Object> consumerConfigs() {
57+
Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties());
58+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
59+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
60+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
61+
62+
return props;
63+
}
64+
65+
@Bean
66+
public ConsumerFactory<String, PracticalAdvice> consumerFactory() {
67+
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
68+
new JsonDeserializer<>(PracticalAdvice.class));
69+
}
70+
71+
@Bean
72+
public ConsumerFactory<String, String> stringConsumerFactory() {
73+
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
74+
new StringDeserializer());
75+
}
76+
77+
@Bean
78+
public ConcurrentKafkaListenerContainerFactory<String, PracticalAdvice> kafkaListenerContainerFactory() {
79+
ConcurrentKafkaListenerContainerFactory<String, PracticalAdvice> factory =
80+
new ConcurrentKafkaListenerContainerFactory<>();
81+
factory.setConsumerFactory(consumerFactory());
82+
83+
return factory;
84+
}
85+
86+
@Bean
87+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerStringContainerFactory() {
88+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
89+
new ConcurrentKafkaListenerContainerFactory<>();
90+
factory.setConsumerFactory(stringConsumerFactory());
91+
92+
return factory;
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.tpd.kafkaexample;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
5+
public class PracticalAdvice {
6+
private final String message;
7+
private final int importance;
8+
9+
public PracticalAdvice(@JsonProperty("message") final String message, @JsonProperty("importance") final int importance) {
10+
this.message = message;
11+
this.importance = importance;
12+
}
13+
14+
public String getMessage() {
15+
return message;
16+
}
17+
18+
public int getImportance() {
19+
return importance;
20+
}
21+
}

src/main/resources/application.yml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
spring:
2+
kafka:
3+
consumer:
4+
group-id: foo
5+
auto-offset-reset: earliest
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.tpd.kafkaexample;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.springframework.boot.test.context.SpringBootTest;
6+
import org.springframework.test.context.junit4.SpringRunner;
7+
8+
@RunWith(SpringRunner.class)
9+
@SpringBootTest
10+
public class KafkaExampleApplicationTests {
11+
12+
@Test
13+
public void contextLoads() {
14+
}
15+
16+
}

0 commit comments

Comments
 (0)