Skip to content

Commit 95f9f4c

Browse files
Add persistent event listeners to monitor connection health for both producer and consumer
1 parent 3aa4b02 commit 95f9f4c

File tree

1 file changed

+92
-51
lines changed

1 file changed

+92
-51
lines changed

src/index.js

+92-51
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,48 @@ class KafkaClient {
107107
);
108108

109109
this.#registry = new SchemaRegistry({ host: this.#avroSchemaRegistry });
110+
111+
// Set up persistent event listeners to monitor connection health for both producer and consumer
112+
this.#registerProducerEventListeners();
113+
this.#registerConsumerEventListeners();
114+
}
115+
116+
#registerProducerEventListeners() {
117+
// If the producer disconnects unexpectedly, set the isProducerConnected flag to false
118+
this.#producer.on('disconnected', () => {
119+
if (this.#isProducerConnected) {
120+
console.error('Kafka producer disconnected unexpectedly');
121+
this.#isProducerConnected = false;
122+
}
123+
});
124+
125+
// Capture event errors and set the isProducerConnected flag to false in case of an event error
126+
this.#producer.on('event.error', (error) => {
127+
console.error(`Kafka producer encountered event error: ${error}`);
128+
this.#isProducerConnected = false;
129+
});
130+
}
131+
132+
#registerConsumerEventListeners() {
133+
// If the consumer disconnects unexpectedly, set the isConsumerConnected flag to false
134+
this.#consumer.on('disconnected', () => {
135+
if (this.#isConsumerConnected) {
136+
console.error('Kafka consumer disconnected unexpectedly');
137+
138+
this.#isConsumerConnected = false;
139+
clearInterval(this.#intervalId);
140+
this.#intervalId = null;
141+
}
142+
});
143+
144+
// Capture event errors and set the isConsumerConnected flag to false in case of an event error
145+
this.#consumer.on('event.error', (error) => {
146+
console.error(`Kafka consumer encountered event error: ${error}`);
147+
148+
this.#isConsumerConnected = false;
149+
clearInterval(this.#intervalId);
150+
this.#intervalId = null;
151+
});
110152
}
111153

112154
/**
@@ -119,7 +161,6 @@ class KafkaClient {
119161
return new Promise((resolve, reject) => {
120162
// Remove any previously attached listeners for these events
121163
this.#producer.removeAllListeners('ready');
122-
this.#producer.removeAllListeners('event.error');
123164
this.#producer.removeAllListeners('connection.failure');
124165

125166
this.#producer.connect();
@@ -129,24 +170,17 @@ class KafkaClient {
129170
console.log('Kafka producer successfully connected');
130171

131172
// Once producer is connected, remove error listeners to avoid handling late errors
132-
this.#producer.removeAllListeners('event.error');
133173
this.#producer.removeAllListeners('connection.failure');
134174

135175
resolve();
136176
});
137177

138-
this.#producer.once('event.error', (err) => {
139-
this.#isProducerConnected = false;
140-
console.error(`Kafka producer connection error: ${err}`);
141-
reject(err);
142-
});
143-
144-
this.#producer.once('connection.failure', (err) => {
178+
this.#producer.once('connection.failure', (error) => {
145179
this.#isProducerConnected = false;
146180
console.error(
147-
`Kafka producer connection resulted in failure: ${err}`,
181+
`Kafka producer connection resulted in failure: ${error}`,
148182
);
149-
reject(err);
183+
reject(error);
150184
});
151185
});
152186
}, retryOptions);
@@ -165,7 +199,6 @@ class KafkaClient {
165199
return new Promise((resolve, reject) => {
166200
// Remove any previously attached listeners for these events
167201
this.#consumer.removeAllListeners('ready');
168-
this.#consumer.removeAllListeners('event.error');
169202
this.#consumer.removeAllListeners('connection.failure');
170203

171204
this.#consumer.connect();
@@ -175,24 +208,17 @@ class KafkaClient {
175208
console.log('Kafka consumer successfully connected');
176209

177210
// Once consumer is connected, remove error listeners to avoid handling late errors
178-
this.#consumer.removeAllListeners('event.error');
179211
this.#consumer.removeAllListeners('connection.failure');
180212

181213
resolve();
182214
});
183215

184-
this.#consumer.once('event.error', (err) => {
185-
this.#isConsumerConnected = false;
186-
console.error(`Kafka consumer connection error: ${err}`);
187-
reject(err);
188-
});
189-
190-
this.#consumer.once('connection.failure', (err) => {
216+
this.#consumer.once('connection.failure', (error) => {
191217
this.#isConsumerConnected = false;
192218
console.error(
193-
`Kafka consumer connection resulted in failure: ${err}`,
219+
`Kafka consumer connection resulted in failure: ${error}`,
194220
);
195-
reject(err);
221+
reject(error);
196222
});
197223
});
198224
}, retryOptions);
@@ -213,7 +239,7 @@ class KafkaClient {
213239
}
214240
} catch (error) {
215241
console.error(
216-
`Error occurred connecting to Kafka producer: ${error.message}`,
242+
`Kafka producer connection failed with error ${error.message}. Max retries reached. Exiting...`,
217243
);
218244
process.exit(1);
219245
}
@@ -231,7 +257,7 @@ class KafkaClient {
231257
}
232258
} catch (error) {
233259
console.error(
234-
`Error occurred connecting to Kafka consumer: ${error.message}`,
260+
`Kafka consumer connection failed with error ${error.message}. Max retries reached. Exiting...`,
235261
);
236262
process.exit(1);
237263
}
@@ -321,21 +347,28 @@ class KafkaClient {
321347
async disconnectProducer() {
322348
try {
323349
if (this.#isProducerConnected) {
324-
return new Promise((resolve) => {
325-
this.#producer.disconnect();
326-
327-
this.#producer.once('disconnected', () => {
328-
this.#isProducerConnected = false;
329-
this.#producer.removeAllListeners();
330-
331-
console.log('Successfully disconnected Kafka producer');
332-
resolve();
350+
await backOff(() => {
351+
return new Promise((resolve, reject) => {
352+
this.#producer.disconnect((error) => {
353+
console.error(`Error occurred disconnecting producer: ${error}`);
354+
reject(error);
355+
});
356+
357+
this.#producer.once('disconnected', () => {
358+
this.#isProducerConnected = false;
359+
this.#producer.removeAllListeners();
360+
361+
console.log('Successfully disconnected Kafka producer');
362+
resolve();
363+
});
333364
});
334-
});
365+
}, retryOptions);
335366
}
336367
} catch (error) {
337-
console.error(`Error disconnecting Kafka producer: ${error}`);
338-
throw new Error(`Error disconnecting Kafka producer: ${error}`);
368+
console.error(
369+
`Kafka producer disconnection failed with error ${error.message}. Max retries reached. Exiting...`,
370+
);
371+
process.exit(1);
339372
}
340373
}
341374

@@ -346,26 +379,34 @@ class KafkaClient {
346379
async disconnectConsumer() {
347380
try {
348381
if (this.#isConsumerConnected) {
349-
return new Promise((resolve) => {
350-
this.#consumer.disconnect();
351-
352-
this.#consumer.once('disconnected', () => {
353-
this.#isConsumerConnected = false;
354-
this.#consumer.removeAllListeners();
355-
356-
clearInterval(this.#intervalId);
357-
this.#intervalId = null;
358-
359-
console.log('Successfully disconnected Kafka consumer');
360-
resolve();
382+
await backOff(() => {
383+
return new Promise((resolve, reject) => {
384+
this.#consumer.disconnect((error) => {
385+
console.error(`Error occurred disconnecting consumer: ${error}`);
386+
reject(error);
387+
});
388+
389+
this.#consumer.once('disconnected', () => {
390+
this.#isConsumerConnected = false;
391+
this.#consumer.removeAllListeners();
392+
393+
clearInterval(this.#intervalId);
394+
this.#intervalId = null;
395+
396+
console.log('Successfully disconnected Kafka consumer');
397+
resolve();
398+
});
361399
});
362-
});
400+
}, retryOptions);
363401
}
364402
} catch (error) {
365-
console.error(`Error disconnecting Kafka consumer: ${error}`);
366403
clearInterval(this.#intervalId);
367404
this.#intervalId = null;
368-
throw new Error(`Error disconnecting Kafka consumer: ${error}`);
405+
406+
console.error(
407+
`Kafka consumer disconnection failed with error ${error.message}. Max retries reached. Exiting...`,
408+
);
409+
process.exit(1);
369410
}
370411
}
371412
}

0 commit comments

Comments
 (0)