-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Currently, the isProducerConnected
and isConsumerConnected
flag is set to true after the producer is connected in the code snippet below. Once the flag is set to true, publish/subscribe to topic methods can be called. The problem with this approach is that in the event of node-rdkafka failure, internally, produce
and consume
methods can not be called because the producer/consumer won't be connected via node-rdkafka internally but state of isProducerConnected
and isConsumerConnected
flag will be set to true regardless and won't make any attempt for reconnection.
To address this problem, big-evil-kafka needs to attach event listeners to both the producer and the consumer for disconnect
and event.error
which will be persistent and in case of any events, state of isProducerConnected
and isConsumerConnected
flag will be set to false so that the consumer and producer reconnection is attempted rather than calling produce
and consume
methods within publishToTopic
and subscribeToTopic
respectively.
async #connectProducer() {
try {
await backOff(() => {
return new Promise((resolve, reject) => {
// Remove any previously attached listeners for these events
this.#producer.removeAllListeners('ready');
this.#producer.removeAllListeners('event.error');
this.#producer.removeAllListeners('connection.failure');
this.#producer.connect();
this.#producer.once('ready', () => {
this.#isProducerConnected = true;
console.log('Kafka producer successfully connected');
// Once producer is connected, remove error listeners to avoid handling late errors
this.#producer.removeAllListeners('event.error');
this.#producer.removeAllListeners('connection.failure');
resolve();
});
this.#producer.once('event.error', (err) => {
this.#isProducerConnected = false;
console.error(`Kafka producer connection error: ${err}`);
reject(err);
});
this.#producer.once('connection.failure', (err) => {
this.#isProducerConnected = false;
console.error(
`Kafka producer connection resulted in failure: ${err}`,
);
reject(err);
});
});
}, retryOptions);
} catch (error) {
throw new Error(error);
}
}