Skip to content

Commit 82bdae9

Browse files
committed
Unify KafkaConsumers (i.e. KafkaAutoCommitConsumer & KafkaManualCommitConsumer)
1 parent 08b5433 commit 82bdae9

24 files changed

+464
-476
lines changed

CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cmake_minimum_required(VERSION "3.8")
22

3-
project("Modern C++ based Kafka API" VERSION 1.0.0)
3+
project("Modern C++ Kafka API" VERSION 1.0.0)
44

55
include(CTest)
66
include(CheckCXXCompilerFlag)

NOTICE

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
Modern C++ based Kafka API
1+
Modern C++ Kafka API
22
Copyright 2020 Morgan Stanley
33
This project includes software developed at Morgan Stanley.

README.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,14 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library t
176176

177177
* `max.poll.records` (default: `500`): The maxmum number of records that a single call to `poll()` would return
178178

179+
* Property which overrides the one from ***librdkafka***
180+
181+
* `enable.auto.commit` (default: `false`): To automatically commit the previously polled offsets on each `poll` operation
182+
179183
* Properties not supposed to be used (internally shadowed by ***modern-cpp-kafka***)
180184

181185
* `enable.auto.offset.store`
182186

183-
* `enable.auto.commit`
184-
185187
* `auto.commit.interval.ms`
186188

187189
* KafkaProducer

doc/GoodPracticesToUseKafkaConsumer.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ If we want to achieve high performance/availability, here're some rules of thumb
1616

1717
## How to avoid polling duplicated messages
1818

19-
* A `KafkaManualCommitConsumer` could help to commit the offsets more frequently (e.g, always do commit after finishing processing a message).
19+
* To commit the offsets more frequently (e.g, always do commit after finishing processing a message).
2020

21-
* Don't use quite a large `MAX_POLL_RECORDS` for a `KafkaAutoCommitConsumer`, -- you might fail to commit all these messages before crash, thus more duplications with the next `poll`.
21+
* Don't use quite a large `MAX_POLL_RECORDS` for a `KafkaConsumer` (with `enable.auto.commit=true`) -- you might fail to commit all these messages before crash, thus more duplications with the next `poll`.
2222

doc/KafkaConsumerQuickStart.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# KafkaConsumer Quick Start
22

3-
Generally speaking, The `Modern C++ based Kafka API` is quite similar with [Kafka Java's API](https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)
3+
Generally speaking, The `Modern C++ Kafka API` is quite similar with [Kafka Java's API](https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)
44

55
We'd recommend users to cross-reference them, --especially the examples.
66

doc/KafkaProducerQuickStart.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# KafkaProducer Quick Start
22

3-
Generally speaking, The `Modern C++ based Kafka API` is quite similar to the [Kafka Java's API](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html).
3+
Generally speaking, The `Modern C++ Kafka API` is quite similar to the [Kafka Java's API](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html).
44

55
We'd recommend users to cross-reference them, --especially the examples.
66

examples/kafka_auto_commit_consumer.cc

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ int main(int argc, char **argv)
1717

1818
// Create configuration object
1919
kafka::Properties props ({
20-
{"bootstrap.servers", brokers},
20+
{"bootstrap.servers", brokers},
21+
{"enable.auto.commit", "true"}
2122
});
2223

2324
// Create a consumer instance.
24-
kafka::KafkaAutoCommitConsumer consumer(props);
25+
kafka::KafkaConsumer consumer(props);
2526

2627
// Subscribe to topics
2728
consumer.subscribe({topic});

examples/kafka_manual_commit_consumer.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ int main(int argc, char **argv)
2121
});
2222

2323
// Create a consumer instance.
24-
kafka::KafkaManualCommitConsumer consumer(props);
24+
kafka::KafkaConsumer consumer(props);
2525

2626
// Subscribe to topics
2727
consumer.subscribe({topic});

include/kafka/AdminClient.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ class AdminClient: public KafkaClient
8989
{
9090
public:
9191
explicit AdminClient(const Properties& properties)
92-
: KafkaClient(ClientType::AdminClient, KafkaClient::validateAndReformProperties(properties))
92+
: KafkaClient(ClientType::AdminClient,
93+
KafkaClient::validateAndReformProperties(properties))
9394
{
9495
}
9596

include/kafka/ConsumerConfig.h

+5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ class ConsumerConfig: public Properties
3535
*/
3636
static const constexpr char* CLIENT_ID = "client.id";
3737

38+
/**
39+
* Automatically commits previously polled offsets on each `poll` operation.
40+
*/
41+
static const constexpr char* ENABLE_AUTO_COMMIT = "enable.auto.commit";
42+
3843
/**
3944
* This property controls the behavior of the consumer when it starts reading a partition for which it doesn't have a valid committed offset.
4045
* The "latest" means the consumer will begin reading the newest records written after the consumer started. While "earliest" means that the consumer will read from the very beginning.

0 commit comments

Comments
 (0)