Skip to content

Commit e7d5653

Browse files
committed
add kafka-spring producer/consumer basic examples
1 parent b608689 commit e7d5653

File tree

8 files changed

+313
-0
lines changed

8 files changed

+313
-0
lines changed

kafka-spring-consumer/pom.xml

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>kafka-examples</artifactId>
7+
<groupId>com.github.fhuss</groupId>
8+
<version>1.0.0</version>
9+
</parent>
10+
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>kafka-spring-consumer</artifactId>
14+
15+
<properties>
16+
<spring.kafka.version>2.1.7.RELEASE</spring.kafka.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.kafka</groupId>
22+
<artifactId>kafka-clients</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.springframework.kafka</groupId>
26+
<artifactId>spring-kafka</artifactId>
27+
<version>${spring.kafka.version}</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.springframework.boot</groupId>
31+
<artifactId>spring-boot-starter</artifactId>
32+
<version>2.0.3.RELEASE</version>
33+
</dependency>
34+
</dependencies>
35+
36+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package com.github.fhuss.kafka.spring;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
4+
import org.apache.kafka.common.TopicPartition;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.boot.ApplicationArguments;
7+
import org.springframework.boot.ApplicationRunner;
8+
import org.springframework.boot.SpringApplication;
9+
import org.springframework.boot.autoconfigure.SpringBootApplication;
10+
import org.springframework.kafka.core.ConsumerFactory;
11+
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
12+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
13+
import org.springframework.kafka.listener.MessageListener;
14+
import org.springframework.kafka.listener.config.ContainerProperties;
15+
16+
import java.util.Collection;
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.TimeUnit;
19+
20+
@SpringBootApplication
21+
public class ConsumerApplicationRunner implements ApplicationRunner {
22+
23+
private final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(4);
24+
25+
@Autowired
26+
private ConsumerFactory<String, String> cf;
27+
28+
public static void main(String[] args) {
29+
SpringApplication.run(ConsumerApplicationRunner.class, args).close();
30+
}
31+
32+
@Override
33+
public void run(ApplicationArguments args) throws Exception {
34+
35+
ContainerProperties containerProperties = new ContainerProperties("testing");
36+
containerProperties.setConsumerRebalanceListener(consumerRebalanceListener());
37+
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
38+
containerProperties.setPollTimeout(Integer.MAX_VALUE);
39+
40+
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
41+
System.out.printf("[%s] Consumed new message : key=%s, value=%s, partition=%s, offset=%s\n", Thread.currentThread().getName(),
42+
record.key(),
43+
record.value(),
44+
record.partition(),
45+
record.offset());
46+
COUNT_DOWN_LATCH.countDown();
47+
});
48+
49+
KafkaMessageListenerContainer<String, String> container =
50+
new KafkaMessageListenerContainer<>(cf, containerProperties);
51+
52+
container.setClientIdSuffix("spring-consumer-container");
53+
container.start();
54+
55+
if (COUNT_DOWN_LATCH.await(1, TimeUnit.MINUTES)) {
56+
System.out.println("Messages received");
57+
} else {
58+
System.out.println("Timeout before receiving all messages");
59+
}
60+
container.stop(new Runnable() {
61+
@Override
62+
public void run() {
63+
64+
}
65+
});
66+
}
67+
68+
private ConsumerRebalanceListener consumerRebalanceListener() {
69+
return new ConsumerRebalanceListener() {
70+
@Override
71+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
72+
System.out.println("Revoked : " + partitions);
73+
}
74+
75+
@Override
76+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
77+
System.out.println("Assigned : " + partitions);
78+
}
79+
};
80+
}
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.github.fhuss.kafka.spring;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.clients.producer.ProducerConfig;
5+
import org.apache.kafka.common.serialization.StringDeserializer;
6+
import org.apache.kafka.common.serialization.StringSerializer;
7+
import org.springframework.beans.factory.annotation.Value;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.kafka.annotation.EnableKafka;
11+
import org.springframework.kafka.core.ConsumerFactory;
12+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
13+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
14+
import org.springframework.kafka.core.KafkaTemplate;
15+
import org.springframework.kafka.core.ProducerFactory;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import java.util.UUID;
20+
21+
@Configuration
22+
@EnableKafka
23+
public class ConsumerConfiguration {
24+
25+
@Value("${bootstrap.server:localhost:9092}")
26+
private String bootstrapServer;
27+
28+
@Value("${consumer.group.id:#{null}}")
29+
private String consumerGroupId;
30+
31+
32+
@Bean
33+
public ConsumerFactory<String, String> kafkaListenerContainerFactory() {
34+
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerConfigs());
35+
return cf;
36+
}
37+
38+
/**
39+
* The Kafka Consumer configuration.
40+
*/
41+
public Map<String, Object> consumerConfigs() {
42+
Map<String, Object> props = new HashMap<>();
43+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
44+
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId != null ? consumerGroupId : ConsumerGroupIdGenerator.generate());
45+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
46+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
47+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
48+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
49+
return props;
50+
}
51+
52+
static class ConsumerGroupIdGenerator {
53+
54+
static String generate() {
55+
return "spring-consumer-group" + UUID.randomUUID().toString();
56+
}
57+
}
58+
}

kafka-spring-producer/pom.xml

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>kafka-examples</artifactId>
7+
<groupId>com.github.fhuss</groupId>
8+
<version>1.0.0</version>
9+
</parent>
10+
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>kafka-spring-producer</artifactId>
14+
15+
<properties>
16+
<spring.kafka.version>2.1.7.RELEASE</spring.kafka.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.kafka</groupId>
22+
<artifactId>kafka-clients</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.springframework.kafka</groupId>
26+
<artifactId>spring-kafka</artifactId>
27+
<version>${spring.kafka.version}</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.springframework.boot</groupId>
31+
<artifactId>spring-boot-starter</artifactId>
32+
<version>2.0.3.RELEASE</version>
33+
</dependency>
34+
</dependencies>
35+
36+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.github.fhuss.kafka.spring;
2+
3+
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.springframework.boot.ApplicationArguments;
5+
import org.springframework.boot.ApplicationRunner;
6+
import org.springframework.boot.SpringApplication;
7+
import org.springframework.boot.autoconfigure.SpringBootApplication;
8+
import org.springframework.kafka.core.KafkaTemplate;
9+
import org.springframework.kafka.support.SendResult;
10+
import org.springframework.util.concurrent.ListenableFuture;
11+
import org.springframework.util.concurrent.ListenableFutureCallback;
12+
13+
@SpringBootApplication
14+
public class ProducerApplicationRunner implements ApplicationRunner {
15+
16+
@Autowired
17+
private KafkaTemplate<String, String> template;
18+
19+
public static void main(String[] args) {
20+
SpringApplication.run(ProducerApplicationRunner.class, args).close();
21+
}
22+
23+
@Override
24+
public void run(ApplicationArguments args) {
25+
sendRecordToTopic("testing", "I");
26+
sendRecordToTopic("testing", "Heart");
27+
sendRecordToTopic("testing", "Logs");
28+
sendRecordToTopic("testing", "Event Data");
29+
sendRecordToTopic("testing", "Stream Processing");
30+
sendRecordToTopic("testing", "and Data Integration");
31+
this.template.flush();
32+
}
33+
34+
private void sendRecordToTopic(String topic, String value) {
35+
ListenableFuture<SendResult<String, String>> future = this.template.send(topic, value);
36+
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
37+
@Override
38+
public void onSuccess(SendResult<String, String> result) {
39+
System.out.println(result);
40+
}
41+
42+
@Override
43+
public void onFailure(Throwable ex) {
44+
System.err.println(ex.getMessage());
45+
}
46+
47+
});
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.github.fhuss.kafka.spring;
2+
3+
import org.apache.kafka.clients.producer.ProducerConfig;
4+
import org.apache.kafka.common.serialization.StringSerializer;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
9+
import org.springframework.kafka.core.KafkaTemplate;
10+
import org.springframework.kafka.core.ProducerFactory;
11+
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
15+
@Configuration
16+
public class ProducerConfiguration {
17+
18+
@Value("${bootstrap.server:localhost:9092}")
19+
private String bootstrapServer;
20+
21+
/**
22+
* Create a new {@link KafkaTemplate} used to send records.
23+
* @return a new {@link KafkaTemplate} instance.
24+
*/
25+
@Bean
26+
public KafkaTemplate<String, String> createTemplate() {
27+
final Map<String, Object> props = producerConfigs();
28+
ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
29+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
30+
return template;
31+
}
32+
33+
/**
34+
* The Kafka Producer configuration.
35+
*/
36+
public Map<String, Object> producerConfigs() {
37+
Map<String, Object> props = new HashMap<>();
38+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
39+
props.put(ProducerConfig.ACKS_CONFIG, "1");
40+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
41+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
42+
return props;
43+
}
44+
}

pom.xml

+8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
<module>producer-interceptor</module>
1111
<module>producer-transactional</module>
1212
<module>consumer-interceptor</module>
13+
<module>kafka-spring-producer</module>
14+
<module>kafka-spring-consumer</module>
1315
</modules>
1416

1517
<groupId>com.github.fhuss</groupId>
@@ -92,6 +94,12 @@
9294
<scope>test</scope>
9395
</dependency>
9496

97+
<dependency>
98+
<groupId>org.apache.kafka</groupId>
99+
<artifactId>kafka-streams</artifactId>
100+
<version>2.1.0-SNAPSHOT</version>
101+
</dependency>
102+
95103
</dependencies>
96104
</dependencyManagement>
97105
</project>

producer/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@
2020
<groupId>org.apache.kafka</groupId>
2121
<artifactId>kafka-clients</artifactId>
2222
</dependency>
23+
2324
</dependencies>
2425
</project>

0 commit comments

Comments
 (0)