Skip to content

Commit 4468c87

Browse files
Merge pull request #4 from cinemataztic/kill-process-repeated-attempts-retry
Exit the process in case producer/consumer fails to connect after multiple retries
2 parents d19611e + 48c813c commit 4468c87

File tree

5 files changed

+37
-27
lines changed

5 files changed

+37
-27
lines changed

README.md

+9-9
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Big Evil Kafka
22

3-
Wrapper package around [node-rdkafka](https://www.npmjs.com/package/node-rdkafka) where only the configuration is required, and the package can be used instantly with just the essentials. Dont be scared from the name, Kafka is cool and the name is a nod to the [Undertaker's](https://en.wikipedia.org/wiki/The_Undertaker) biker persona in the early 2000's.
3+
Wrapper package around [node-rdkafka](https://www.npmjs.com/package/node-rdkafka) where only the configuration is required, and the package can be used instantly with just the essentials. Don't be scared from the name, Kafka is cool and the name is a nod to the [Undertaker's](https://en.wikipedia.org/wiki/The_Undertaker) biker persona in the early 2000s.
44

5-
The purpose of this package is to provide a batteries included package where one does not have to worry about configuring the [node-rdkafka](https://www.npmjs.com/package/node-rdkafka) package for using kafka client's functions like sending a message to a topic and consuming a message to a topic. The package handles producer/consumer connection internally and only allows disconnecting both producer and consumer connection.
5+
The purpose of this package is to provide a battery-included package where one does not have to worry about configuring the [node-rdkafka](https://www.npmjs.com/package/node-rdkafka) package for using Kafka client's functions like sending a message to a topic and consuming a message from a topic. The package handles producer/consumer connection internally and only allows disconnecting both producer and consumer connection.
66

77
## Getting started
88

@@ -14,7 +14,7 @@ npm i @cinemataztic/big-evil-kafka
1414

1515
## Prerequisites
1616

17-
This package uses [confluent-schema-registry](https://www.npmjs.com/package/@kafkajs/confluent-schema-registry) and assumes that schema registry is in place along with kafka server running in the background.
17+
This package uses [confluent-schema-registry](https://www.npmjs.com/package/@kafkajs/confluent-schema-registry) and assumes that the schema registry is in place along with the Kafka server running in the background.
1818

1919
## Usage
2020

@@ -26,7 +26,7 @@ const { KafkaClient } = require('@cinemataztic/big-evil-kafka');
2626

2727
## Configuration
2828

29-
Configurations must be passed to the KafkaClient in order to initialize node-rdkafka producer and consumer internally.
29+
Configurations must be passed to the KafkaClient to initialize node-rdkafka producer and consumer internally.
3030

3131
### `config`
3232

@@ -44,13 +44,13 @@ Configurations must be passed to the KafkaClient in order to initialize node-rdk
4444

4545
- `brokers?: Array`
4646

47-
The list of brokers that specifies the Kafka broker(s), the producer and consumer should connect to. Brokers needs to be passed as an array i.e `['localhost:9092', 'kafka:29092']` because the package internally converts them to string as per the requirement for node-rdkafka that requires `metadata.broker.list` as a string.
47+
The list of brokers that specifies the Kafka broker(s), the producer and consumer should connect to. Brokers need to be passed as an array, i.e, `['localhost:9092', 'kafka:29092']` because the package internally converts them to string as per the requirement for node-rdkafka that requires `metadata.broker.list` as a string.
4848

4949
Default value is `['localhost:9092']`.
5050

5151
- `avroSchemaRegistry?: string`
5252

53-
The schema registry URL which helps in encoding and decoding the messages according to a specific avro schema in a subject.
53+
The schema registry URL, which helps in encoding and decoding the messages according to a specific Avro schema in a subject.
5454

5555
Default value is `http://localhost:8081`.
5656

@@ -83,7 +83,7 @@ To consume a message from a topic, provide the topic name and a callback functio
8383
client.consumeMessage(topic, onMessage);
8484
```
8585

86-
## Disconnecting
86+
## Disconnection
8787

8888
To disconnect either the producer or consumer, call the following methods for both producer and consumer respectively.
8989
```js
@@ -95,6 +95,6 @@ client.disconnectConsumer();
9595

9696
## Motivation
9797

98-
Many of our services are relying upon the kafka message queue system. The problem with using node-rdkafka in each of the different services is that in case of any change to kafka configuration, it had to be replicated across different services for consistency and also the manual setup and configuration of node-rdkafka is not simple and requires a lot of effort to set it up in a way that ensures maintainability.
98+
Many of our services are relying upon the Kafka message queue system. The problem with using node-rdkafka in each of the different services is that in case of any change to kafka configuration, it had to be replicated across different services for consistency and also the manual setup and configuration of node-rdkafka is not simple and requires a lot of effort to set it up in a way that ensures maintainability.
9999

100-
Having a wrapper package around node-rdkafka enables us to not only utilize [exponential backoff](https://www.npmjs.com/package/exponential-backoff) for consumer/producer retry mechanism but also to provide a batteries included package that would simply allow users to send and consume messages and with additional ability to disconnect them in case of an error in the services. The user only needs to provide the configuration to use the package and can use the package without worrying about to set it up with respect to node-rdkafka connection and its methods that can be somewhat hard to implement if not well-versed in how node-rdkafka works.
100+
Having a wrapper package around node-rdkafka allows us to not only utilize [exponential backoff](https://www.npmjs.com/package/exponential-backoff) for consumer/producer retry mechanism but also to provide a batteries-included package that would simply allow users to send and consume messages, and with additional ability to disconnect them in case of an error in the services.

package-lock.json

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@cinemataztic/big-evil-kafka",
3-
"version": "1.0.6",
3+
"version": "1.0.8",
44
"description": "A wrapper around node-rdkafka",
55
"main": "src/index.js",
66
"scripts": {

src/index.js

+24-14
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ class KafkaClient {
127127
this.#producer.once('ready', () => {
128128
this.#isProducerConnected = true;
129129
console.log('Kafka producer successfully connected');
130+
131+
// Once producer is connected, remove error listeners to avoid handling late errors
132+
this.#producer.removeAllListeners('event.error');
133+
this.#producer.removeAllListeners('connection.failure');
134+
130135
resolve();
131136
});
132137

@@ -143,10 +148,10 @@ class KafkaClient {
143148
);
144149
reject(err);
145150
});
146-
}, retryOptions);
147-
});
151+
});
152+
}, retryOptions);
148153
} catch (error) {
149-
throw new Error(`Error connecting to Kafka producer: ${error}`);
154+
throw new Error(error);
150155
}
151156
}
152157

@@ -168,6 +173,11 @@ class KafkaClient {
168173
this.#consumer.once('ready', () => {
169174
this.#isConsumerConnected = true;
170175
console.log('Kafka consumer successfully connected');
176+
177+
// Once consumer is connected, remove error listeners to avoid handling late errors
178+
this.#consumer.removeAllListeners('event.error');
179+
this.#consumer.removeAllListeners('connection.failure');
180+
171181
resolve();
172182
});
173183

@@ -184,10 +194,10 @@ class KafkaClient {
184194
);
185195
reject(err);
186196
});
187-
}, retryOptions);
188-
});
197+
});
198+
}, retryOptions);
189199
} catch (error) {
190-
throw new Error(`Error connecting to Kafka consumer: ${error}`);
200+
throw new Error(error);
191201
}
192202
}
193203

@@ -202,7 +212,10 @@ class KafkaClient {
202212
await this.#connectProducer();
203213
}
204214
} catch (error) {
205-
throw new Error(`Error initializing producer: ${error}`);
215+
console.error(
216+
`Error occurred connecting to Kafka producer: ${error.message}`,
217+
);
218+
process.exit(1);
206219
}
207220
}
208221

@@ -217,7 +230,10 @@ class KafkaClient {
217230
await this.#connectConsumer();
218231
}
219232
} catch (error) {
220-
throw new Error(`Error initializing consumer: ${error}`);
233+
console.error(
234+
`Error occurred connecting to Kafka consumer: ${error.message}`,
235+
);
236+
process.exit(1);
221237
}
222238
}
223239

@@ -247,9 +263,6 @@ class KafkaClient {
247263
);
248264

249265
console.log(`Successfully published data to topic: ${topic}`);
250-
} else {
251-
console.error('Major issue with the kafka producer init process.');
252-
throw new Error('Unable to initialize kafka producer');
253266
}
254267
} catch (error) {
255268
console.error(
@@ -295,9 +308,6 @@ class KafkaClient {
295308
);
296309
}
297310
});
298-
} else {
299-
console.error('Major issue with the kafka consumer init process.');
300-
throw new Error('Unable to initialize kafka consumer');
301311
}
302312
} catch (error) {
303313
console.error(

src/utils/retryOptions.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
*/
1010
const retryOptions = {
1111
startingDelay: 1000,
12-
numOfAttempts: 3,
12+
numOfAttempts: 5,
1313
timeMultiple: 2,
1414
retry: (error, attemptNumber) => {
1515
console.error(`Attempt ${attemptNumber} failed due to error: ${error}`);

0 commit comments

Comments
 (0)