Skip to content

Commit b57553e

Browse files
author
Amit Kumatr
committed
format the code
1 parent 6cd68f9 commit b57553e

File tree

5 files changed

+64
-60
lines changed

5 files changed

+64
-60
lines changed
Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
package com.baeldung.kafka.commitoffset;
22

33
import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties;
4+
45
import java.time.Duration;
6+
57
import org.apache.kafka.clients.consumer.ConsumerRecord;
68
import org.apache.kafka.clients.consumer.ConsumerRecords;
79
import org.apache.kafka.clients.consumer.KafkaConsumer;
810

911
public class AsyncCommit {
1012

11-
public static void main(String[] args) {
13+
public static void main(String[] args) {
1214

13-
KafkaConsumer<Long, String> consumer =
14-
new KafkaConsumer<>(KafkaConfigProperties.getProperties());
15-
consumer.subscribe(KafkaConfigProperties.getTopic());
16-
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
17-
for (ConsumerRecord<Long, String> message : messages) {
18-
// processed message
19-
consumer.commitAsync();
15+
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties());
16+
consumer.subscribe(KafkaConfigProperties.getTopic());
17+
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
18+
for (ConsumerRecord<Long, String> message : messages) {
19+
// processed message
20+
consumer.commitAsync();
21+
}
2022
}
21-
}
2223
}
Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
package com.baeldung.kafka.commitoffset;
22

33
import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties;
4+
45
import java.time.Duration;
56
import java.util.Properties;
7+
68
import org.apache.kafka.clients.consumer.ConsumerConfig;
79
import org.apache.kafka.clients.consumer.ConsumerRecord;
810
import org.apache.kafka.clients.consumer.ConsumerRecords;
911
import org.apache.kafka.clients.consumer.KafkaConsumer;
1012

1113
public class AutomaticCommit {
1214

13-
public static void main(String[] args) {
15+
public static void main(String[] args) {
1416

15-
Properties properties = KafkaConfigProperties.getProperties();
16-
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
17-
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(properties);
18-
consumer.subscribe(KafkaConfigProperties.getTopic());
19-
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
20-
for (ConsumerRecord<Long, String> message : messages) {
21-
// processed message
17+
Properties properties = KafkaConfigProperties.getProperties();
18+
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
19+
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(properties);
20+
consumer.subscribe(KafkaConfigProperties.getTopic());
21+
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
22+
for (ConsumerRecord<Long, String> message : messages) {
23+
// processed message
24+
}
2225
}
23-
}
2426
}
Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,34 @@
11
package com.baeldung.kafka.commitoffset;
22

33
import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties;
4+
45
import java.time.Duration;
56
import java.util.HashMap;
67
import java.util.Map;
8+
79
import org.apache.kafka.clients.consumer.ConsumerRecord;
810
import org.apache.kafka.clients.consumer.ConsumerRecords;
911
import org.apache.kafka.clients.consumer.KafkaConsumer;
1012
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
1113
import org.apache.kafka.common.TopicPartition;
1214

1315
public class SpecificOffsetCommit {
14-
public static void main(String[] args) {
16+
public static void main(String[] args) {
1517

16-
KafkaConsumer<Long, String> consumer =
17-
new KafkaConsumer<>(KafkaConfigProperties.getProperties());
18-
consumer.subscribe(KafkaConfigProperties.getTopic());
19-
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
20-
int messageProcessed = 0;
21-
while (true) {
22-
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
23-
for (ConsumerRecord<Long, String> message : messages) {
24-
// processed message
25-
messageProcessed++;
26-
currentOffsets.put(
27-
new TopicPartition(message.topic(), message.partition()),
28-
new OffsetAndMetadata(message.offset() + 1));
29-
if (messageProcessed % 50 == 0) {
30-
consumer.commitSync(currentOffsets);
18+
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties());
19+
consumer.subscribe(KafkaConfigProperties.getTopic());
20+
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
21+
int messageProcessed = 0;
22+
while (true) {
23+
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
24+
for (ConsumerRecord<Long, String> message : messages) {
25+
// processed message
26+
messageProcessed++;
27+
currentOffsets.put(new TopicPartition(message.topic(), message.partition()), new OffsetAndMetadata(message.offset() + 1));
28+
if (messageProcessed % 50 == 0) {
29+
consumer.commitSync(currentOffsets);
30+
}
31+
}
3132
}
32-
}
3333
}
34-
}
3534
}
Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
package com.baeldung.kafka.commitoffset;
22

33
import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties;
4+
45
import java.time.Duration;
6+
57
import org.apache.kafka.clients.consumer.ConsumerRecord;
68
import org.apache.kafka.clients.consumer.ConsumerRecords;
79
import org.apache.kafka.clients.consumer.KafkaConsumer;
810

911
public class SyncCommit {
1012

11-
public static void main(String[] args) {
13+
public static void main(String[] args) {
1214

13-
KafkaConsumer<Long, String> consumer =
14-
new KafkaConsumer<>(KafkaConfigProperties.getProperties());
15-
consumer.subscribe(KafkaConfigProperties.getTopic());
16-
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
17-
for (ConsumerRecord<Long, String> message : messages) {
18-
// processed message
19-
consumer.commitSync();
15+
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties());
16+
consumer.subscribe(KafkaConfigProperties.getTopic());
17+
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
18+
for (ConsumerRecord<Long, String> message : messages) {
19+
// processed message
20+
consumer.commitSync();
21+
}
2022
}
21-
}
2223
}

apache-kafka-3/src/main/java/com/baeldung/kafka/commitoffset/config/KafkaConfigProperties.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,30 @@
22

33
import java.util.ArrayList;
44
import java.util.Properties;
5+
56
import org.apache.kafka.clients.consumer.ConsumerConfig;
67
import org.apache.kafka.common.serialization.StringDeserializer;
78

89
/**
910
* @author amitkumar
1011
*/
1112
public class KafkaConfigProperties {
12-
public static final String MY_TOPIC = "my-topic";
13+
public static final String MY_TOPIC = "my-topic";
1314

14-
public static Properties getProperties() {
15+
public static Properties getProperties() {
1516

16-
Properties props = new Properties();
17-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
18-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer");
19-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
20-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
21-
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
22-
return props;
23-
}
17+
Properties props = new Properties();
18+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
19+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
20+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer");
21+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
22+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
23+
return props;
24+
}
2425

25-
public static ArrayList<String> getTopic() {
26-
ArrayList<String> topics = new ArrayList<>();
27-
topics.add(MY_TOPIC);
28-
return topics;
29-
}
26+
public static ArrayList<String> getTopic() {
27+
ArrayList<String> topics = new ArrayList<>();
28+
topics.add(MY_TOPIC);
29+
return topics;
30+
}
3031
}

0 commit comments

Comments
 (0)