Skip to content

Commit a9f7d57

Browse files
change library for supporting rabbitmq (#13)
* v3 * v3 * feat: BREAKING CHANGE: change library for supporting rabbitmq * apply changes * apply changes
1 parent 91d27ef commit a9f7d57

File tree

8 files changed

+150
-128
lines changed

8 files changed

+150
-128
lines changed

package.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nestjstools/messaging-rabbitmq-extension",
3-
"version": "2.13.0",
3+
"version": "3.0.0",
44
"description": "Extension to handle messages and dispatch them over AMQP protocol",
55
"author": "Sebastian Iwanczyszyn",
66
"private": false,
@@ -51,12 +51,12 @@
5151
"test:e2e": "node_modules/.bin/jest --config ./test/jest-e2e.json"
5252
},
5353
"dependencies": {
54-
"rabbitmq-client": "^5.0.4"
54+
"amqplib": "^0.10.9"
5555
},
5656
"peerDependencies": {
5757
"@nestjs/common": "^10.x||^11.x",
5858
"@nestjs/core": "^10.x||^11.x",
59-
"@nestjstools/messaging": "^2.X",
59+
"@nestjstools/messaging": "^3.x",
6060
"reflect-metadata": "^0.2.0",
6161
"rxjs": "^7.x"
6262
},
@@ -66,7 +66,7 @@
6666
"@nestjs/core": "^11.0.0",
6767
"@nestjs/schematics": "^11.0.0",
6868
"@nestjs/testing": "^11.0.0",
69-
"@nestjstools/messaging": "^2.21.0",
69+
"@nestjstools/messaging": "^3.0.0",
7070
"@semantic-release/github": "^11.0.1",
7171
"@types/express": "^5.0.0",
7272
"@types/jest": "^29.5.2",

src/channel/amqp-channel.factory.ts

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/channel/amqp.channel.ts

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,41 @@
1-
import { AmqpChannelConfig, Channel } from '@nestjstools/messaging';
1+
import { Channel } from '@nestjstools/messaging';
22
import { RmqChannelConfig as ExtensionAmqpChannelConfig } from './rmq-channel.config';
3-
import { Connection } from 'rabbitmq-client';
3+
import * as amqp from 'amqplib';
44

5-
export class AmqpChannel extends Channel<
6-
AmqpChannelConfig | ExtensionAmqpChannelConfig
7-
> {
8-
public readonly connection: Connection;
9-
public readonly config: AmqpChannelConfig | ExtensionAmqpChannelConfig;
5+
export class AmqpChannel extends Channel<ExtensionAmqpChannelConfig> {
6+
public connection?: any;
7+
public channel?: any;
8+
public readonly config: ExtensionAmqpChannelConfig;
109

11-
constructor(config: AmqpChannelConfig | ExtensionAmqpChannelConfig) {
10+
constructor(config: ExtensionAmqpChannelConfig) {
1211
super(config);
13-
this.connection = new Connection(config.connectionUri);
12+
this.config = config;
13+
}
14+
15+
async init(): Promise<void> {
16+
if (this.connection && this.channel) {
17+
return Promise.resolve();
18+
}
19+
20+
this.connection = undefined;
21+
this.channel = undefined;
22+
23+
this.connection = await amqp.connect(this.config.connectionUri);
24+
this.channel = await this.connection.createChannel();
25+
26+
if (this.config.queue) {
27+
await this.channel.assertQueue(this.config.queue, { durable: true });
28+
}
1429
}
1530

1631
async onChannelDestroy(): Promise<void> {
17-
await this.connection.close();
18-
return Promise.resolve();
32+
if (this.channel) {
33+
await this.channel.close();
34+
this.channel = undefined;
35+
}
36+
if (this.connection) {
37+
await this.connection.close();
38+
this.connection = undefined;
39+
}
1940
}
2041
}

src/consumer/rabbitmq-messaging.consumer.ts

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { RABBITMQ_HEADER_ROUTING_KEY } from '../const';
33
import { IMessagingConsumer } from '@nestjstools/messaging';
44
import { ConsumerMessageDispatcher } from '@nestjstools/messaging';
55
import { ConsumerMessage } from '@nestjstools/messaging';
6-
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
6+
import { Injectable, OnModuleDestroy } from '@nestjs/common';
77
import { MessageConsumer } from '@nestjstools/messaging';
88
import { ConsumerDispatchedMessageError } from '@nestjstools/messaging';
99
import { RabbitmqMigrator } from '../migrator/rabbitmq.migrator';
@@ -12,7 +12,7 @@ import { Buffer } from 'buffer';
1212
@Injectable()
1313
@MessageConsumer(AmqpChannel)
1414
export class RabbitmqMessagingConsumer
15-
implements IMessagingConsumer<AmqpChannel>
15+
implements IMessagingConsumer<AmqpChannel>, OnModuleDestroy
1616
{
1717
private channel?: AmqpChannel = undefined;
1818

@@ -22,53 +22,68 @@ export class RabbitmqMessagingConsumer
2222
dispatcher: ConsumerMessageDispatcher,
2323
channel: AmqpChannel,
2424
): Promise<void> {
25-
await this.rabbitMqMigrator.run(channel);
25+
await channel.init();
2626
this.channel = channel;
27+
await this.rabbitMqMigrator.run(channel);
2728

28-
channel.connection.createConsumer(
29-
{
30-
queue: channel.config.queue,
31-
queueOptions: { durable: true },
32-
requeue: false,
33-
},
34-
async (msg): Promise<void> => {
35-
const rabbitMqMessage = msg as RabbitMQMessage;
29+
const amqpChannel = channel.channel;
30+
if (!amqpChannel) {
31+
throw new Error('AMQP channel not initialized');
32+
}
33+
34+
await amqpChannel.consume(
35+
channel.config.queue,
36+
async (msg) => {
37+
if (!msg) return;
3638

37-
let message = rabbitMqMessage.body;
39+
let message: any = msg.content;
3840
if (Buffer.isBuffer(message)) {
39-
const messageContent = message.toString();
40-
message = JSON.parse(messageContent);
41+
message = JSON.parse(message.toString());
4142
}
4243

4344
const routingKey =
44-
rabbitMqMessage.headers?.[RABBITMQ_HEADER_ROUTING_KEY] ??
45-
rabbitMqMessage.routingKey;
45+
msg.properties.headers?.[RABBITMQ_HEADER_ROUTING_KEY] ??
46+
msg.fields.routingKey;
47+
48+
await dispatcher.dispatch(new ConsumerMessage(message, routingKey));
4649

47-
dispatcher.dispatch(new ConsumerMessage(message, routingKey));
50+
amqpChannel.ack(msg);
4851
},
52+
{ noAck: false },
4953
);
50-
51-
return Promise.resolve();
5254
}
5355

5456
async onError(
5557
errored: ConsumerDispatchedMessageError,
5658
channel: AmqpChannel,
5759
): Promise<void> {
58-
if (channel.config.deadLetterQueueFeature) {
59-
const publisher = channel.connection.createPublisher();
60-
const envelope = {
61-
headers: {
62-
'messaging-routing-key': errored.dispatchedConsumerMessage.routingKey,
60+
if (channel.config.deadLetterQueueFeature && channel.channel) {
61+
const exchange = 'dead_letter.exchange';
62+
const routingKey = `${channel.config.queue}_dead_letter`;
63+
64+
await channel.channel.assertExchange(exchange, 'direct', {
65+
durable: true,
66+
});
67+
68+
channel.channel.publish(
69+
exchange,
70+
routingKey,
71+
Buffer.from(JSON.stringify(errored.dispatchedConsumerMessage.message)),
72+
{
73+
headers: {
74+
'messaging-routing-key':
75+
errored.dispatchedConsumerMessage.routingKey,
76+
},
6377
},
64-
exchange: 'dead_letter.exchange',
65-
routingKey: `${channel.config.queue}_dead_letter`,
66-
};
67-
await publisher.send(envelope, errored.dispatchedConsumerMessage.message);
68-
await publisher.close();
78+
);
6979
}
80+
}
7081

71-
return Promise.resolve();
82+
async onModuleDestroy(): Promise<void> {
83+
if (this.channel) {
84+
await this.channel.connection.close();
85+
this.channel = undefined;
86+
}
7287
}
7388
}
7489

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import { Envelope } from 'rabbitmq-client';
2-
31
export class AmqpMessageBuilder {
42
private constructor(
5-
private exchangeName: string = undefined,
6-
private routingKey: string = undefined,
7-
private headers: { [key: string]: string } = undefined,
8-
private message: object = undefined,
3+
private exchangeName?: string,
4+
private routingKey?: string,
5+
private headers?: { [key: string]: any },
6+
private message?: object,
97
) {}
108

119
static create(): AmqpMessageBuilder {
@@ -22,16 +20,15 @@ export class AmqpMessageBuilder {
2220
return this;
2321
}
2422

25-
withHeaders(headers: { [key: string]: string }): AmqpMessageBuilder {
23+
withHeaders(headers: { [key: string]: any }): AmqpMessageBuilder {
2624
this.headers = headers;
2725
return this;
2826
}
2927

30-
addHeader(key: string, value: string): AmqpMessageBuilder {
28+
addHeader(key: string, value: any): AmqpMessageBuilder {
3129
if (!this.headers) {
3230
this.headers = {};
3331
}
34-
3532
this.headers[key] = value;
3633
return this;
3734
}
@@ -42,30 +39,34 @@ export class AmqpMessageBuilder {
4239
}
4340

4441
buildMessage(): AmqpMessage {
45-
if (this.exchangeName === undefined) {
42+
if (!this.exchangeName) {
4643
throw new Error('Exchange name must be defined');
4744
}
4845

49-
if (this.routingKey === undefined) {
50-
throw new Error('RoutingKey name must be defined');
46+
if (!this.routingKey) {
47+
throw new Error('Routing key must be defined');
5148
}
5249

53-
if (this.message === undefined) {
54-
throw new Error('Message name must be defined');
50+
if (!this.message) {
51+
throw new Error('Message must be defined');
5552
}
5653

5754
return {
5855
message: this.message,
5956
envelope: {
6057
exchange: this.exchangeName,
6158
routingKey: this.routingKey,
62-
headers: this.headers,
59+
headers: this.headers ?? {},
6360
},
6461
};
6562
}
6663
}
6764

68-
interface AmqpMessage {
65+
export interface AmqpMessage {
6966
message: object;
70-
envelope: Envelope;
67+
envelope: {
68+
exchange: string;
69+
routingKey: string;
70+
headers?: { [key: string]: any };
71+
};
7172
}

src/message-bus/amqp-message.bus.ts

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
11
import { RoutingMessage } from '@nestjstools/messaging';
22
import { IMessageBus } from '@nestjstools/messaging';
33
import { Injectable } from '@nestjs/common';
4-
import { Connection } from 'rabbitmq-client';
5-
import { AmqpMessageOptions } from '../message/amqp-message-options';
64
import { AmqpChannel } from '../channel/amqp.channel';
7-
import { ExchangeType } from '@nestjstools/messaging';
5+
import { AmqpMessageOptions } from '../message/amqp-message-options';
86
import { AmqpMessageBuilder } from './amqp-message.builder';
97
import { RABBITMQ_HEADER_ROUTING_KEY } from '../const';
8+
import { ExchangeType } from '../channel/rmq-channel.config';
109

1110
@Injectable()
1211
export class AmqpMessageBus implements IMessageBus {
13-
private readonly connection: Connection;
14-
15-
constructor(private readonly amqpChanel: AmqpChannel) {
16-
this.connection = amqpChanel.connection;
17-
}
12+
constructor(private readonly amqpChannel: AmqpChannel) {}
1813

1914
async dispatch(message: RoutingMessage): Promise<object | void> {
2015
if (
@@ -37,9 +32,19 @@ export class AmqpMessageBus implements IMessageBus {
3732
);
3833

3934
const amqpMessage = messageBuilder.buildMessage();
40-
const publisher = await this.connection.createPublisher();
41-
await publisher.send(amqpMessage.envelope, amqpMessage.message);
42-
await publisher.close();
35+
36+
if (!this.amqpChannel.channel) {
37+
throw new Error('AMQP channel not initialized. Did you call init()?');
38+
}
39+
40+
await this.amqpChannel.channel.publish(
41+
amqpMessage.envelope.exchange,
42+
amqpMessage.envelope.routingKey,
43+
Buffer.from(JSON.stringify(amqpMessage.message)),
44+
{
45+
headers: amqpMessage.envelope.headers,
46+
},
47+
);
4348
}
4449

4550
private createMessageBuilderWhenUndefined(
@@ -49,13 +54,13 @@ export class AmqpMessageBus implements IMessageBus {
4954

5055
messageBuilder
5156
.withMessage(message.message)
52-
.withExchangeName(this.amqpChanel.config.exchangeName);
57+
.withExchangeName(this.amqpChannel.config.exchangeName);
5358

54-
if (this.amqpChanel.config.exchangeType === ExchangeType.DIRECT) {
59+
if (this.amqpChannel.config.exchangeType === ExchangeType.DIRECT) {
5560
messageBuilder.withRoutingKey(this.getRoutingKey(message));
5661
}
5762

58-
if (this.amqpChanel.config.exchangeType === ExchangeType.TOPIC) {
63+
if (this.amqpChannel.config.exchangeType === ExchangeType.TOPIC) {
5964
messageBuilder.withRoutingKey(message.messageRoutingKey);
6065
}
6166

@@ -67,10 +72,11 @@ export class AmqpMessageBus implements IMessageBus {
6772
): AmqpMessageBuilder {
6873
const options = message.messageOptions as AmqpMessageOptions;
6974
const messageBuilder = AmqpMessageBuilder.create();
75+
7076
messageBuilder
7177
.withMessage(message.message)
7278
.withExchangeName(
73-
options.exchangeName ?? this.amqpChanel.config.exchangeName,
79+
options.exchangeName ?? this.amqpChannel.config.exchangeName,
7480
)
7581
.withRoutingKey(options.routingKey ?? this.getRoutingKey(message))
7682
.withHeaders(options.headers);
@@ -79,9 +85,9 @@ export class AmqpMessageBus implements IMessageBus {
7985
}
8086

8187
private getRoutingKey(message: RoutingMessage): string {
82-
return this.amqpChanel.config.bindingKeys !== undefined
83-
? this.amqpChanel.config.bindingKeys.length > 0
84-
? this.amqpChanel.config.bindingKeys[0]
88+
return this.amqpChannel.config.bindingKeys !== undefined
89+
? this.amqpChannel.config.bindingKeys.length > 0
90+
? this.amqpChannel.config.bindingKeys[0]
8591
: message.messageRoutingKey
8692
: message.messageRoutingKey;
8793
}

0 commit comments

Comments
 (0)