Skip to content

Commit ac987b2

Browse files
author
pk635890
committed
Flink Kafka connector examples
1 parent 2dc8d74 commit ac987b2

File tree

7 files changed

+377
-0
lines changed

7 files changed

+377
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.pd.streaming.connector.kafka.examples;
2+
3+
import java.io.Serializable;
4+
5+
import lombok.Data;
6+
7+
@SuppressWarnings("serial")
8+
@Data
9+
public class KafkaRecord implements Serializable
10+
{
11+
String key;
12+
String value;
13+
Long timestamp;
14+
15+
@Override
16+
public String toString()
17+
{
18+
return key+":"+value;
19+
}
20+
21+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package org.pd.streaming.connector.kafka.examples;
2+
3+
import java.time.LocalTime;
4+
import java.util.Properties;
5+
6+
import org.apache.flink.api.common.functions.ReduceFunction;
7+
import org.apache.flink.streaming.api.datastream.DataStream;
8+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
9+
import org.apache.flink.streaming.api.windowing.time.Time;
10+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
11+
import org.apache.kafka.common.serialization.StringSerializer;
12+
13+
public class Main1
14+
{
15+
static String TOPIC_IN = "Topic1-IN";
16+
static String BOOTSTRAP_SERVER = "localhost:9092";
17+
18+
@SuppressWarnings("serial")
19+
public static void main( String[] args ) throws Exception
20+
{
21+
Producer<String> p = new Producer<String>(BOOTSTRAP_SERVER, StringSerializer.class.getName());
22+
23+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
24+
25+
Properties props = new Properties();
26+
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
27+
props.put("client.id", "flink-example1");
28+
29+
// Reading data directly as <Key, Value> from Kafka. Write an inner class containing key, value
30+
// and use it to deserialise Kafka record.
31+
// Reference => https://stackoverflow.com/questions/53324676/how-to-use-flinkkafkaconsumer-to-parse-key-separately-k-v-instead-of-t
32+
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
33+
34+
kafkaConsumer.setStartFromLatest();
35+
36+
// create a stream to ingest data from Kafka as a custom class with explicit key/value
37+
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
38+
39+
// supports timewindow without group by key
40+
stream
41+
.timeWindowAll(Time.seconds(5))
42+
.reduce(new ReduceFunction<KafkaRecord>()
43+
{
44+
KafkaRecord result = new KafkaRecord();
45+
46+
@Override
47+
public KafkaRecord reduce(KafkaRecord record1, KafkaRecord record2) throws Exception
48+
{
49+
System.out.println(LocalTime.now() + " -> " + record1 + " " + record2);
50+
51+
result.key = record1.key;
52+
result.value = record1.value + record2.value;
53+
54+
return result;
55+
}
56+
})
57+
.print(); // immediate printing to console
58+
59+
//.keyBy( (KeySelector<KafkaRecord, String>) KafkaRecord::getKey )
60+
//.timeWindow(Time.seconds(5))
61+
62+
// produce a number as string every second
63+
new NumberGenerator(p, TOPIC_IN).start();
64+
65+
// for visual topology of the pipeline. Paste the below output in https://flink.apache.org/visualizer/
66+
System.out.println( env.getExecutionPlan() );
67+
68+
// start flink
69+
env.execute();
70+
}
71+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.pd.streaming.connector.kafka.examples;
2+
3+
import java.time.LocalTime;
4+
import java.util.Properties;
5+
6+
import org.apache.flink.api.common.functions.ReduceFunction;
7+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
8+
import org.apache.flink.streaming.api.datastream.DataStream;
9+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10+
import org.apache.flink.streaming.api.windowing.time.Time;
11+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
12+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
13+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
14+
import org.apache.kafka.clients.producer.ProducerRecord;
15+
import org.apache.kafka.common.serialization.StringSerializer;
16+
17+
public class Main2
18+
{
19+
static String TOPIC_IN = "Topic2-IN";
20+
static String TOPIC_OUT = "Topic2-OUT";
21+
static String BOOTSTRAP_SERVER = "localhost:9092";
22+
23+
@SuppressWarnings("serial")
24+
public static void main( String[] args ) throws Exception
25+
{
26+
Producer<String> p = new Producer<String>(BOOTSTRAP_SERVER, StringSerializer.class.getName());
27+
28+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
29+
30+
Properties props = new Properties();
31+
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
32+
props.put("client.id", "flink-example2");
33+
34+
// Alternate consumer to get only values per Topic
35+
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new SimpleStringSchema(), props);
36+
kafkaConsumer.setStartFromLatest();
37+
38+
// Create Kafka producer from Flink API
39+
Properties prodProps = new Properties();
40+
prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER);
41+
42+
FlinkKafkaProducer<String> kafkaProducer =
43+
new FlinkKafkaProducer<String>(TOPIC_OUT,
44+
((value, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT, "myKey".getBytes(), value.getBytes())),
45+
prodProps,
46+
Semantic.EXACTLY_ONCE);
47+
48+
// create a stream to ingest data from Kafka with value as String
49+
DataStream<String> stream = env.addSource(kafkaConsumer);
50+
51+
stream
52+
.timeWindowAll(Time.seconds(5)) // ignoring grouping per key
53+
.reduce(new ReduceFunction<String>()
54+
{
55+
@Override
56+
public String reduce(String value1, String value2) throws Exception
57+
{
58+
System.out.println(LocalTime.now() + " -> " + value1 + " " + value2);
59+
return value1+value2;
60+
}
61+
})
62+
.addSink(kafkaProducer);
63+
64+
// produce a number as string every second
65+
new NumberGenerator(p, TOPIC_IN).start();
66+
67+
// for visual topology of the pipeline. Paste the below output in https://flink.apache.org/visualizer/
68+
System.out.println( env.getExecutionPlan() );
69+
70+
// start flink
71+
env.execute();
72+
}
73+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package org.pd.streaming.connector.kafka.examples;
2+
3+
import java.util.Properties;
4+
5+
import org.apache.flink.api.common.functions.AggregateFunction;
6+
import org.apache.flink.streaming.api.TimeCharacteristic;
7+
import org.apache.flink.streaming.api.datastream.DataStream;
8+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
9+
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
10+
import org.apache.flink.streaming.api.windowing.time.Time;
11+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
12+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
13+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
14+
import org.apache.kafka.clients.producer.ProducerRecord;
15+
import org.apache.kafka.common.serialization.StringSerializer;
16+
17+
public class Main3
18+
{
19+
static String TOPIC_IN = "Topic3-IN";
20+
static String TOPIC_OUT = "Topic3-OUT";
21+
static String BOOTSTRAP_SERVER = "localhost:9092";
22+
23+
@SuppressWarnings("serial")
24+
public static void main( String[] args ) throws Exception
25+
{
26+
Producer<String> p = new Producer<String>(BOOTSTRAP_SERVER, StringSerializer.class.getName());
27+
28+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
29+
30+
// to use allowed lateness
31+
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
32+
33+
Properties props = new Properties();
34+
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
35+
props.put("client.id", "flink-example3");
36+
37+
// consumer to get both key/values per Topic
38+
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
39+
40+
// for allowing Flink to handle late elements
41+
kafkaConsumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<KafkaRecord>()
42+
{
43+
@Override
44+
public long extractAscendingTimestamp(KafkaRecord record)
45+
{
46+
return record.timestamp;
47+
}
48+
});
49+
50+
kafkaConsumer.setStartFromLatest();
51+
52+
// Create Kafka producer from Flink API
53+
Properties prodProps = new Properties();
54+
prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER);
55+
56+
FlinkKafkaProducer<String> kafkaProducer =
57+
new FlinkKafkaProducer<String>(TOPIC_OUT,
58+
((value, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT, "myKey".getBytes(), value.getBytes())),
59+
prodProps,
60+
Semantic.EXACTLY_ONCE);
61+
62+
// create a stream to ingest data from Kafka with key/value
63+
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
64+
65+
stream
66+
.filter((record) -> record.value != null && !record.value.isEmpty())
67+
.keyBy(record -> record.key)
68+
.timeWindow(Time.seconds(5))
69+
.allowedLateness(Time.milliseconds(500))
70+
.aggregate(new AggregateFunction<KafkaRecord, String, String>() // kafka aggregate API is very simple but same can be achieved by Flink's reduce
71+
{
72+
@Override
73+
public String createAccumulator() {
74+
return "";
75+
}
76+
77+
@Override
78+
public String add(KafkaRecord record, String accumulator) {
79+
return accumulator + record.value.length();
80+
}
81+
82+
@Override
83+
public String getResult(String accumulator) {
84+
return accumulator;
85+
}
86+
87+
@Override
88+
public String merge(String a, String b) {
89+
return a+b;
90+
}
91+
})
92+
.addSink(kafkaProducer);
93+
94+
// produce a number as string every second
95+
new NumberGenerator(p, TOPIC_IN).start();
96+
97+
// for visual topology of the pipeline. Paste the below output in https://flink.apache.org/visualizer/
98+
System.out.println( env.getExecutionPlan() );
99+
100+
// start flink
101+
env.execute();
102+
}
103+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.pd.streaming.connector.kafka.examples;
2+
3+
import org.apache.flink.api.common.typeinfo.TypeInformation;
4+
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
7+
@SuppressWarnings("serial")
8+
public class MySchema implements KafkaDeserializationSchema<KafkaRecord>
9+
{
10+
@Override
11+
public boolean isEndOfStream(KafkaRecord nextElement) {
12+
return false;
13+
}
14+
15+
@Override
16+
public KafkaRecord deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
17+
KafkaRecord data = new KafkaRecord();
18+
data.key = new String(record.key());
19+
data.value = new String(record.value());
20+
data.timestamp = record.timestamp();
21+
22+
return data;
23+
}
24+
25+
@Override
26+
public TypeInformation<KafkaRecord> getProducedType() {
27+
return TypeInformation.of(KafkaRecord.class);
28+
}
29+
}
30+
31+
32+
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.pd.streaming.connector.kafka.examples;
2+
3+
public class NumberGenerator extends Thread
4+
{
5+
int counter = 0;
6+
final Producer<String> p;
7+
final String topic;
8+
9+
public NumberGenerator(Producer<String> p, String topic)
10+
{
11+
this.p = p;
12+
this.topic = topic;
13+
}
14+
15+
@Override
16+
public void run()
17+
{
18+
try
19+
{
20+
while( ++counter > 0 )
21+
{
22+
p.send(topic, "[" + counter + "]");
23+
24+
Thread.sleep( 1000 );
25+
}
26+
}
27+
catch (InterruptedException e)
28+
{
29+
e.printStackTrace();
30+
}
31+
}
32+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.pd.streaming.connector.kafka.examples;
2+
3+
import java.util.Properties;
4+
5+
import org.apache.kafka.clients.producer.KafkaProducer;
6+
import org.apache.kafka.clients.producer.ProducerConfig;
7+
import org.apache.kafka.clients.producer.ProducerRecord;
8+
import org.apache.kafka.common.serialization.StringSerializer;
9+
10+
public class Producer<T>
11+
{
12+
String bootstrapServers;
13+
KafkaProducer<String, T> producer;
14+
15+
public Producer(String kafkaServer, String serializerName)
16+
{
17+
this.bootstrapServers = kafkaServer;
18+
// create Producer properties
19+
Properties properties = new Properties();
20+
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
21+
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
22+
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializerName);
23+
24+
// create the producer
25+
producer = new KafkaProducer<String, T>(properties);
26+
}
27+
28+
public void send(String topic, T message)
29+
{
30+
// create a producer record
31+
ProducerRecord<String, T> record = new ProducerRecord<String, T>(topic, "myKey", message);
32+
33+
// send data - asynchronous
34+
producer.send(record);
35+
36+
// flush data
37+
producer.flush();
38+
}
39+
40+
public void close()
41+
{
42+
// flush and close producer
43+
producer.close();
44+
}
45+
}

0 commit comments

Comments
 (0)