Skip to content

Commit 1735ba6

Browse files
committed
add consumer-interceptor example
1 parent 3f2e6ff commit 1735ba6

File tree

7 files changed

+403
-0
lines changed

7 files changed

+403
-0
lines changed

consumer-interceptor/pom.xml

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>kafka-examples</artifactId>
7+
<groupId>com.github.fhuss</groupId>
8+
<version>1.0.0</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>consumer-interceptor</artifactId>
13+
<name>kafka-examples-consumer-interceptor</name>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>org.apache.kafka</groupId>
18+
<artifactId>kafka-clients</artifactId>
19+
</dependency>
20+
</dependencies>
21+
22+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.github.fhuss.kafka.examples.consumer;
18+
19+
import com.github.fhuss.kafka.examples.consumer.internal.Time;
20+
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
24+
import org.apache.kafka.clients.producer.KafkaProducer;
25+
import org.apache.kafka.clients.producer.Producer;
26+
import org.apache.kafka.clients.producer.ProducerConfig;
27+
import org.apache.kafka.clients.producer.ProducerRecord;
28+
import org.apache.kafka.common.TopicPartition;
29+
import org.apache.kafka.common.errors.InterruptException;
30+
import org.apache.kafka.common.serialization.StringSerializer;
31+
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
public class AuditConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
37+
38+
// Record Metadata
39+
private static final String TRACKING_PARTITION = "partition";
40+
private static final String TRACKING_OFFSET = "offset";
41+
private static final String TRACKING_TIMESTAMP = "timestamp";
42+
private static final String TRACKING_TOPIC = "topic";
43+
44+
private static final String JSON_OPEN_BRACKET = "{";
45+
private static final String JSON_CLOSE_BRACKET = "}";
46+
47+
private String originalsClientId;
48+
49+
private AuditInterceptorConfig configs;
50+
51+
private Producer<String, String> producer;
52+
53+
@Override
54+
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
55+
records.forEach(r -> {
56+
final String value = getJsonTrackingMessage(r);
57+
producer.send(new ProducerRecord<>(configs.getAuditTopic(), value));
58+
});
59+
return null;
60+
}
61+
62+
@Override
63+
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
64+
65+
}
66+
67+
private String getJsonTrackingMessage(ConsumerRecord<K, V> record) {
68+
return JSON_OPEN_BRACKET +
69+
"\"" + "timestamp" + "\":\"" + Time.SYSTEM.milliseconds() + "\"" +
70+
"\",client\":" +
71+
JSON_OPEN_BRACKET +
72+
"\"" + "clientId" + "\":\"" + originalsClientId + "\"" +
73+
",\"" + "applicationId" + "\":\"" + configs.getAuditApplicationId() + "\"" +
74+
",\"" + "type" + "\":\"consumer\"" +
75+
JSON_CLOSE_BRACKET +
76+
",\"record\":" +
77+
JSON_OPEN_BRACKET +
78+
"\"" + TRACKING_PARTITION + "\":\"" + record.partition() + "\"" +
79+
",\"" + TRACKING_TOPIC + "\":\"" + record.topic() + "\"" +
80+
",\"" + TRACKING_OFFSET + "\":\"" + record.offset() + "\"" +
81+
",\"" + TRACKING_TIMESTAMP + "\":\"" + record.timestamp() + "\"" +
82+
JSON_CLOSE_BRACKET +
83+
JSON_CLOSE_BRACKET;
84+
}
85+
86+
@Override
87+
public void close() {
88+
if (this.producer != null) {
89+
try {
90+
this.producer.close();
91+
} catch (InterruptException e) {
92+
Thread.currentThread().interrupt();
93+
}
94+
}
95+
}
96+
97+
@Override
98+
public void configure(Map<String, ?> configs) {
99+
final Map<String, Object> copyConfigs = new HashMap<>(configs);
100+
// Drop interceptor classes configuration to not introduce loop.
101+
copyConfigs.remove(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG);
102+
103+
this.originalsClientId = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
104+
105+
String interceptorClientId = (originalsClientId == null) ?
106+
"interceptor-consumer-" + ClientIdGenerator.nextClientId() :
107+
"interceptor-" + originalsClientId;
108+
109+
copyConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, interceptorClientId);
110+
111+
this.configs = new AuditInterceptorConfig(copyConfigs);
112+
113+
copyConfigs.putAll(this.configs.getOverrideProducerConfigs());
114+
115+
// Enforce some properties to get a non-blocking producer;
116+
copyConfigs.put(ProducerConfig.RETRIES_CONFIG, "0");
117+
copyConfigs.put(ProducerConfig.ACKS_CONFIG, "1");
118+
copyConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "0");
119+
copyConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
120+
copyConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
121+
122+
this.producer = new KafkaProducer<>(copyConfigs);
123+
}
124+
125+
private static class ClientIdGenerator {
126+
127+
private static final AtomicInteger IDS = new AtomicInteger(0);
128+
129+
static int nextClientId() {
130+
return IDS.getAndIncrement();
131+
}
132+
}
133+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.github.fhuss.kafka.examples.consumer;
18+
19+
import org.apache.kafka.common.config.AbstractConfig;
20+
import org.apache.kafka.common.config.ConfigDef;
21+
22+
import java.util.Map;
23+
24+
public class AuditInterceptorConfig extends AbstractConfig {
25+
26+
public static final String AUDIT_APPLICATION_ID_CONFIG = "audit.application.id";
27+
public static final String AUDIT_APPLICATION_ID_DOC = "The application id used to identify the producer";
28+
29+
public static final String AUDIT_TOPIC_CONFIG = "audit.topic";
30+
public static final String AUDIT_TOPIC_DOC = "The topic name";
31+
32+
private static final ConfigDef CONFIG;
33+
public static final String AUDIT_PRODUCER_PREFIX = "audit.producer.";
34+
35+
static {
36+
CONFIG = new ConfigDef()
37+
.define(AUDIT_APPLICATION_ID_CONFIG, ConfigDef.Type.STRING,
38+
ConfigDef.Importance.HIGH, AUDIT_APPLICATION_ID_DOC)
39+
.define(AUDIT_TOPIC_CONFIG, ConfigDef.Type.STRING,
40+
ConfigDef.Importance.HIGH, AUDIT_TOPIC_DOC);
41+
42+
}
43+
44+
/**
45+
* Creates a new {@link AuditInterceptorConfig} instance.
46+
*
47+
* @param originals the interceptor configuration.
48+
*/
49+
AuditInterceptorConfig(final Map<String, ?> originals) {
50+
super(CONFIG, originals);
51+
}
52+
53+
/**
54+
* Creates a new {@link AuditInterceptorConfig} instance.
55+
*
56+
* @param definition the {@link ConfigDef} instance.
57+
* @param originals the interceptor configuration.
58+
*/
59+
private AuditInterceptorConfig(final ConfigDef definition,
60+
final Map<String, ?> originals) {
61+
super(definition, originals);
62+
}
63+
64+
public Map<String, Object> getOverrideProducerConfigs() {
65+
return originalsWithPrefix(AUDIT_PRODUCER_PREFIX);
66+
}
67+
68+
69+
public String getAuditTopic() {
70+
return this.getString(AUDIT_TOPIC_CONFIG);
71+
}
72+
73+
public String getAuditApplicationId() {
74+
return this.getString(AUDIT_APPLICATION_ID_CONFIG);
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.github.fhuss.kafka.examples.consumer;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecords;
22+
import org.apache.kafka.clients.consumer.KafkaConsumer;
23+
import org.apache.kafka.common.errors.WakeupException;
24+
import org.apache.kafka.common.serialization.StringDeserializer;
25+
26+
import java.util.Collections;
27+
import java.util.Properties;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
public class ConsumerWithInterceptor {
32+
33+
private static final AtomicBoolean closed = new AtomicBoolean(false);
34+
35+
private static final CountDownLatch latch = new CountDownLatch(1);
36+
37+
public static void main(String[] args) {
38+
39+
final Properties configs = newConsumerConfigs("localhost:9092", "group-1");
40+
final Consumer<String, String> consumer = new KafkaConsumer<>(configs);
41+
42+
// Add shutdown hook to respond to SIGTERM and gracefully stop the application.
43+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
44+
System.out.println("Closing application gracefully (SIGTERM)");
45+
closed.set(true);
46+
consumer.wakeup();
47+
try {
48+
latch.await();
49+
} catch (InterruptedException ignore) {
50+
}
51+
System.out.println("Closed");
52+
}));
53+
54+
try {
55+
consumer.subscribe(Collections.singleton("my-topic"));
56+
consumer.poll(0); // trigger partition assignments.
57+
58+
// Starting consumption
59+
while (!closed.get()) {
60+
ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
61+
// Handle new records
62+
if (records != null) {
63+
records.forEach(r -> {
64+
System.out.printf("Consumed record : key=%s, value=%s", r.key(), r.value());
65+
});
66+
}
67+
}
68+
} catch (WakeupException e) {
69+
// Ignore exception if closing
70+
if (!closed.get()) throw e;
71+
} finally {
72+
System.out.println("Closing consumer");
73+
consumer.close();
74+
latch.countDown();
75+
}
76+
}
77+
78+
static Properties newConsumerConfigs(final String bootstrapServer,
79+
final String group) {
80+
Properties props = new Properties();
81+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
82+
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
83+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
84+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
85+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
86+
87+
// Configure interceptor and attached configuration.
88+
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, AuditConsumerInterceptor.class.getName());
89+
props.put(AuditInterceptorConfig.AUDIT_TOPIC_CONFIG, "tracking-clients");
90+
props.put(AuditInterceptorConfig.AUDIT_APPLICATION_ID_CONFIG, "kafka-clients-examples");
91+
return props;
92+
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.github.fhuss.kafka.examples.consumer.internal;
18+
19+
public interface Time {
20+
21+
Time SYSTEM = new SystemTime();
22+
23+
/**
24+
* Returns the current time in milliseconds.
25+
*/
26+
long milliseconds();
27+
28+
29+
/**
30+
* Sleep for the given number of milliseconds
31+
*/
32+
void sleep(long ms);
33+
34+
class SystemTime implements Time {
35+
36+
@Override
37+
public long milliseconds() {
38+
return System.currentTimeMillis();
39+
}
40+
41+
@Override
42+
public void sleep(long ms) {
43+
try {
44+
Thread.sleep(ms);
45+
} catch (InterruptedException e) {
46+
// just wake up early
47+
Thread.currentThread().interrupt();
48+
}
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)