Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nestjstools/messaging-rabbitmq-extension",
"version": "3.2.0",
"version": "3.3.0",
"description": "Extension to handle messages and dispatch them over AMQP protocol",
"author": "Sebastian Iwanczyszyn",
"private": false,
Expand Down Expand Up @@ -51,7 +51,8 @@
"test:e2e": "node_modules/.bin/jest --config ./test/jest-e2e.json"
},
"dependencies": {
"rabbitmq-client": "^5.0.5"
"amqp-connection-manager": "^5.0.0",
"amqplib": "^0.10.9"
},
"peerDependencies": {
"@nestjs/common": "^10.x||^11.x",
Expand All @@ -68,6 +69,7 @@
"@nestjs/testing": "^11.0.0",
"@nestjstools/messaging": "^3.0.2",
"@semantic-release/github": "^11.0.1",
"@types/amqplib": "^0.10.7",
"@types/express": "^5.0.0",
"@types/jest": "^29.5.2",
"@types/node": "^20.3.1",
Expand Down
31 changes: 21 additions & 10 deletions src/channel/amqp.channel.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
import { Channel } from '@nestjstools/messaging';
import { RmqChannelConfig } from './rmq-channel.config';
import { Connection } from 'rabbitmq-client';
import { RmqChannelConfig as ExtensionAmqpChannelConfig } from './rmq-channel.config';
import {
AmqpConnectionManager,
ChannelWrapper,
connect,
} from 'amqp-connection-manager';

export class AmqpChannel extends Channel<
RmqChannelConfig
> {
public readonly connection: Connection;
public readonly config: RmqChannelConfig;
export class AmqpChannel extends Channel<ExtensionAmqpChannelConfig> {
public connection: AmqpConnectionManager;
public readonly config: ExtensionAmqpChannelConfig;

constructor(config: RmqChannelConfig) {
constructor(config: ExtensionAmqpChannelConfig) {
super(config);
this.connection = new Connection(config.connectionUri);
this.config = config;
this.connection = connect(this.config.connectionUri, {
reconnectTimeInSeconds: 5,
heartbeatIntervalInSeconds: 30,
});
}

createChannelWrapper(): ChannelWrapper {
return this.connection.createChannel();
}

async onChannelDestroy(): Promise<void> {
if (!this.connection) return;
await this.connection.close();
return Promise.resolve();
this.connection = undefined;
}
}
107 changes: 64 additions & 43 deletions src/consumer/rabbitmq-messaging.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,78 +3,99 @@ import { RABBITMQ_HEADER_ROUTING_KEY } from '../const';
import { IMessagingConsumer } from '@nestjstools/messaging';
import { ConsumerMessageDispatcher } from '@nestjstools/messaging';
import { ConsumerMessage } from '@nestjstools/messaging';
import { Injectable } from '@nestjs/common';
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { MessageConsumer } from '@nestjstools/messaging';
import { ConsumerDispatchedMessageError } from '@nestjstools/messaging';
import { RabbitmqMigrator } from '../migrator/rabbitmq.migrator';
import { Buffer } from 'buffer';
import { ChannelWrapper } from 'amqp-connection-manager';
import { Channel, ConsumeMessage, Options } from 'amqplib';

@Injectable()
@MessageConsumer(AmqpChannel)
export class RabbitmqMessagingConsumer
implements IMessagingConsumer<AmqpChannel>
implements IMessagingConsumer<AmqpChannel>, OnModuleDestroy
{
private channel?: AmqpChannel = undefined;
private amqpChannel: ChannelWrapper;

constructor(private readonly rabbitMqMigrator: RabbitmqMigrator) {}

async consume(
dispatcher: ConsumerMessageDispatcher,
channel: AmqpChannel,
): Promise<void> {
await this.rabbitMqMigrator.run(channel);
this.channel = channel;
await this.rabbitMqMigrator.run(channel);

if (!channel.connection) {
throw new Error('Brak aktywnego połączenia AMQP');
}

channel.connection.createConsumer(
{
queue: channel.config.queue,
queueOptions: { durable: true },
requeue: false,
},
async (msg): Promise<void> => {
const rabbitMqMessage = msg as RabbitMQMessage;
const channelWrapper = channel.createChannelWrapper();
await channelWrapper.waitForConnect();
this.amqpChannel = channelWrapper;

let message = rabbitMqMessage.body;
if (Buffer.isBuffer(message)) {
const messageContent = message.toString();
message = JSON.parse(messageContent);
}
await channelWrapper.addSetup(async (rawChannel: Channel) => {
return rawChannel.consume(
channel.config.queue,
async (msg: ConsumeMessage | null) => {
if (!msg) return;

const routingKey =
rabbitMqMessage.headers?.[RABBITMQ_HEADER_ROUTING_KEY] ??
rabbitMqMessage.routingKey;
let payload: unknown = msg.content;
if (Buffer.isBuffer(payload)) {
try {
payload = JSON.parse(payload.toString());
} catch {
rawChannel.nack(msg, false, false);
return;
}
}

dispatcher.dispatch(new ConsumerMessage(message, routingKey));
},
);
const routingKey: string =
(msg.properties.headers?.[RABBITMQ_HEADER_ROUTING_KEY] as
| string
| undefined) ?? msg.fields.routingKey;

return Promise.resolve();
if (dispatcher.isReady()) {
await dispatcher.dispatch(
new ConsumerMessage(payload as object, routingKey),
);
rawChannel.ack(msg);
} else {
rawChannel.nack(msg, false, true);
}
},
{ noAck: false },
);
});
}

async onError(
errored: ConsumerDispatchedMessageError,
channel: AmqpChannel,
): Promise<void> {
if (channel.config.deadLetterQueueFeature) {
const publisher = channel.connection.createPublisher();
const envelope = {
headers: {
'messaging-routing-key': errored.dispatchedConsumerMessage.routingKey,
},
exchange: 'dead_letter.exchange',
routingKey: `${channel.config.queue}_dead_letter`,
};
await publisher.send(envelope, errored.dispatchedConsumerMessage.message);
await publisher.close();
}
if (channel.config.deadLetterQueueFeature && this.amqpChannel) {
const exchange = 'dead_letter.exchange';
const routingKey = `${channel.config.queue}_dead_letter`;

return Promise.resolve();
await this.amqpChannel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(errored.dispatchedConsumerMessage.message)),
{
headers: {
[RABBITMQ_HEADER_ROUTING_KEY]:
errored.dispatchedConsumerMessage.routingKey,
},
} as Options.Publish,
);
}
}
}

interface RabbitMQMessage {
contentType: string;
body: object;
routingKey: string;
headers: { [key: string]: string };
async onModuleDestroy(): Promise<void> {
if (this.channel?.connection) {
await this.channel.connection.close();
}
this.channel = undefined;
}
}
47 changes: 31 additions & 16 deletions src/message-bus/amqp-message.bus.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import { RoutingMessage } from '@nestjstools/messaging';
import { IMessageBus } from '@nestjstools/messaging';
import { Injectable } from '@nestjs/common';
import { Connection } from 'rabbitmq-client';
import { AmqpMessageOptions } from '../message/amqp-message-options';
import { AmqpChannel } from '../channel/amqp.channel';
import { AmqpMessageOptions } from '../message/amqp-message-options';
import { AmqpMessageBuilder } from './amqp-message.builder';
import { RABBITMQ_HEADER_ROUTING_KEY } from '../const';
import { ExchangeType } from '../channel/rmq-channel.config';
import { ChannelWrapper } from 'amqp-connection-manager';

@Injectable()
export class AmqpMessageBus implements IMessageBus {
private readonly connection: Connection;
public publisherChannel?: ChannelWrapper;

constructor(private readonly amqpChanel: AmqpChannel) {
this.connection = amqpChanel.connection;
}
constructor(private readonly amqpChannel: AmqpChannel) {}

async dispatch(message: RoutingMessage): Promise<object | void> {
if (!this.publisherChannel && this.amqpChannel.connection) {
this.publisherChannel = this.amqpChannel.createChannelWrapper();
}

if (
message.messageOptions !== undefined &&
!(message.messageOptions instanceof AmqpMessageOptions)
Expand All @@ -37,9 +39,21 @@ export class AmqpMessageBus implements IMessageBus {
);

const amqpMessage = messageBuilder.buildMessage();
const publisher = await this.connection.createPublisher();
await publisher.send(amqpMessage.envelope, amqpMessage.message);
await publisher.close();

await this.publisherChannel.publish(
amqpMessage.envelope.exchange,
amqpMessage.envelope.routingKey,
Buffer.from(JSON.stringify(amqpMessage.message)),
{
headers: amqpMessage.envelope.headers,
},
);
}

async initPublisherChannel() {
if (!this.publisherChannel && this.amqpChannel.connection) {
this.publisherChannel = await this.amqpChannel.connection.createChannel();
}
}

private createMessageBuilderWhenUndefined(
Expand All @@ -49,13 +63,13 @@ export class AmqpMessageBus implements IMessageBus {

messageBuilder
.withMessage(message.message)
.withExchangeName(this.amqpChanel.config.exchangeName);
.withExchangeName(this.amqpChannel.config.exchangeName);

if (this.amqpChanel.config.exchangeType === ExchangeType.DIRECT) {
if (this.amqpChannel.config.exchangeType === ExchangeType.DIRECT) {
messageBuilder.withRoutingKey(this.getRoutingKey(message));
}

if (this.amqpChanel.config.exchangeType === ExchangeType.TOPIC) {
if (this.amqpChannel.config.exchangeType === ExchangeType.TOPIC) {
messageBuilder.withRoutingKey(message.messageRoutingKey);
}

Expand All @@ -67,10 +81,11 @@ export class AmqpMessageBus implements IMessageBus {
): AmqpMessageBuilder {
const options = message.messageOptions as AmqpMessageOptions;
const messageBuilder = AmqpMessageBuilder.create();

messageBuilder
.withMessage(message.message)
.withExchangeName(
options.exchangeName ?? this.amqpChanel.config.exchangeName,
options.exchangeName ?? this.amqpChannel.config.exchangeName,
)
.withRoutingKey(options.routingKey ?? this.getRoutingKey(message))
.withHeaders(options.headers);
Expand All @@ -79,9 +94,9 @@ export class AmqpMessageBus implements IMessageBus {
}

private getRoutingKey(message: RoutingMessage): string {
return this.amqpChanel.config.bindingKeys !== undefined
? this.amqpChanel.config.bindingKeys.length > 0
? this.amqpChanel.config.bindingKeys[0]
return this.amqpChannel.config.bindingKeys !== undefined
? this.amqpChannel.config.bindingKeys.length > 0
? this.amqpChannel.config.bindingKeys[0]
: message.messageRoutingKey
: message.messageRoutingKey;
}
Expand Down
Loading