Skip to content
This repository was archived by the owner on Jul 13, 2023. It is now read-only.
Open

V3 #5

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .babelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"presets": [
[
"@babel/preset-env", {
"targets": {
"node": "current"
}
}
]
],
"plugins": ["babel-plugin-transform-object-rest-spread", "@babel/plugin-transform-classes"]
}
41 changes: 23 additions & 18 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
module.exports = {
"env": {
"es6": true,
"node": true,
"mocha": true
parser: 'babel-eslint',
parserOptions: {
sourceType: "module",
},
"extends": "eslint:recommended",
"rules": {
"indent": [
"error",
env: {
'es6': true,
'node': true,
'mocha': true
},
extends: 'eslint:recommended',
rules: {
indent: [
'error',
2
],
"linebreak-style": [
"error",
"unix"
'linebreak-style': [
'error',
'unix'
],
quotes: [
'error',
'single'
],
"quotes": [
"error",
"single"
semi: [
'error',
'always'
],
"semi": [
"error",
"always"
]
"no-func-assign": "warn"
}
};
84 changes: 41 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,73 @@
Node js Rabbit MQ client which has connection management backed into it.
This project is written on top of [amqp-connection-manager](https://github.com/benbria/node-amqp-connection-manager).

## NOTE

Version 3 is a major and breaking change from Version 2. Please use appropriate version for your use.

## Features

* Automatically reconnect when your amqplib broker dies in a fire.
* Round-robin connections between multiple brokers in a cluster.
* If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
* Very un-opinionated library - a thin wrapper around amqplib.

## Configuration

```javascript
{
host: process.env.PUBSUB_RABBITMQ_SERVICE_HOST,
port: process.env.PUBSUB_RABBITMQ_SERVICE_PORT_AMQP || 5672,
username: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD,
prefetch: process.env.PREFETCH_JOBS || 2,
vhost: process.env.VHOST || '/',
heartbeatInterval: process.env.HEARTBEAT || 5,
reconnectTime: process.env.RECONNECT_TIME || 10,
protocol: process.env.RABBITMQ_PROTOCOL || 'amqp',
defaultQueueFeatures: { durable: true },
options: {
// options.findServers(callback) is a function which returns one or more servers to connect to. This should return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism such as Consul or etcd. Instead of taking a callback, this can also return a Promise. Note that if this is supplied, then urls is ignored.
findServers,
// options.connectionOptions is passed as options to the amqplib connect method.
connectionOptions
}
}
```

## Usage

Using yarn: `yarn add node-rabbitmq-client`
Using npm: `npm install node-rabbitmq-client`
Using yarn: `yarn add node-rabbitmq-client@3.0.0` </br> OR
Using npm: `npm install node-rabbitmq-client@3.0.0`

```javascript
import RabbitMQClient from 'node-rabitmq-client';
// (OR)
const RabbitMQClient = require('node-rabbitmq-client');

/**
* options is the object which is passed to consume at the time of initialization
*
* options = {
* queue: {
* name: 'some-queue-name'
* }
* }
*/

const { publish, consume, purgeQueue, ackAll } = RabbitMQClient;
// instantiate a client object
const client = new RabbitMQClient(config);

/* to publish a message */
publish({ queue: { name: 'some name' } }, data);
`data` is JS object
// `data` is JS object
client.publish({ queue: { name: 'some name' } }, data);

/* to consume from a queue */
consume({ queue: { name: 'some name' } }, promiseHandler);
client.consume({ queue: { name: 'some name' } }, promiseHandler);

/* to purge a queue */
purge({ queue: { name: 'some name' } });
client.purge({ queue: { name: 'some name' } });

/* to ack all messages */
ackAll();
client.ackAll();
```

## Please read this for implementing consume

* `promiseHandler` for consume should always return a resolved Promise even if some operations on the received message fails.
* When returning a resolved Promise, parameters need not be passed to it.If passed, these are simply ignored.
* Best practice is to implement a catch handler for the `promiseHandler` and push to some other queue and return a resolved Promise from there.
* If parsing the JSON message fails while consuming, this will try to push this error to another queue `parsingErrors`. So, if this failure is to be handled and noted, provide `parsingErrors` queue in the same config. (This is optional. Whether or not queue is provided, the error will be logged);
* If parsing the JSON message fails while consuming, a rejected promise is thrown and needs to be handled appropriately (This is optional. Whether or not queue is provided, the error will be logged);
* `promiseHandler` gets the message and the options that were passed to consume intially

```javascript
Expand All @@ -73,7 +93,7 @@ promiseFunction(message, options)
/* once processing the message is successful, return resolved promise */
/* if status queue is provided and success should be recorded */
if (statusQueue && recordSuccess) {
publish(statusQueue, {
client.publish(statusQueue, {
status: 'success',
queueName,
message: data
Expand All @@ -86,7 +106,7 @@ promiseFunction(message, options)
.catch(error => {
if (statusQueue && recordError) {
/* if status queue is provided and failure should be recorded */
publish(statusQueue, {
client.publish(statusQueue, {
status: 'error',
queueName,
error,
Expand All @@ -102,25 +122,3 @@ promiseFunction(message, options)
return Promise.resolve();
});
```

## Configuration

By default, this looks at `/config/env/${NODE_ENV}` file for rabbitMQ configuration
If configuration is not found here, then it looks at `/src/config/env/${NODE_ENV}`

```javascript
config = {
rabbitMQ: {
host: process.env.PUBSUB_RABBITMQ_SERVICE_HOST,
port: process.env.PUBSUB_RABBITMQ_SERVICE_PORT_AMQP || 5672,
username: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD,
prefetch: process.env.PREFETCH_JOBS || 2,
vhost: process.env.VHOST || '/',
heartbeatInterval: process.env.HEARTBEAT || 5,
reconnectTime: process.env.RECONNECT_TIME || 10,
protocol: process.env.RABBITMQ_PROTOCOL || 'amqp',
defaultQueueFeatures: { durable: true }
}
}
```
4 changes: 2 additions & 2 deletions config/env/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ const config = {
rabbitMQ: {
host: process.env.PUBSUB_RABBITMQ_SERVICE_HOST || 'localhost',
port: process.env.PUBSUB_RABBITMQ_SERVICE_PORT_AMQP || 5672,
username: process.env.RABBITMQ_USERNAME || 'user',
password: process.env.RABBITMQ_PASSWORD || 'user',
username: process.env.RABBITMQ_USERNAME || 'guest',
password: process.env.RABBITMQ_PASSWORD || 'guest',
prefetch: process.env.PREFETCH_JOBS || 2,
protocol: 'amqp',
heartbeatInterval: 5,
Expand Down
33 changes: 33 additions & 0 deletions config/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const fs = require('fs');

const env = process.env.NODE_ENV || 'test';
const config = {};

/*
Base configurations
*/
fs.readdirSync(__dirname)
.filter(f => !f.includes('index.js') && !f.includes('env'))
.forEach((filename) => {
// eslint-disable-next-line
Object.assign(config, require(`./${filename}`));
});

/*
Environment configuration overrides
*/
const envDir = `${__dirname}/env`;
const isEnvExisting = fs.existsSync(envDir);

if (isEnvExisting) {
fs.readdirSync(envDir).forEach((filename) => {
const basename = filename.split('.')[0];

if (env === basename) {
// eslint-disable-next-line
Object.assign(config, require(`./env/${filename}`))
}
});
}

module.exports = config;
32 changes: 19 additions & 13 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
{
"name": "node-rabbitmq-client",
"version": "2.0.2",
"version": "3.0.2",
"description": "Auto-reconnect and round robin support for amqplib.",
"author": "Kishore <[email protected]>",
"licenses": [{
"type": "MIT",
"url": "https://raw.github.com/avdkishore/node-rabbitmq-client/master/LICENSE"
}],
"licenses": [
{
"type": "MIT",
"url": "https://raw.github.com/avdkishore/node-rabbitmq-client/master/LICENSE"
}
],
"homepage": "https://github.com/avdkishore/node-rabbitmq-client",
"main": "lib/index.js",
"scripts": {
"build": "yarn clean && yarn test && babel -s -d lib src",
"build": "yarn clean && yarn test && babel -s -d lib src/index.js && yarn lint:fix",
"clean": "rm -rf lib coverage .nyc_output",
"test": "export NODE_ENV=test && yarn test:lint && mocha",
"test": "export NODE_ENV=test && yarn test:lint && mocha --require @babel/register",
"test:lint": "eslint src test",
"lint:fix": "eslint --fix src test",
"prepare": "yarn build"
},
"dependencies": {
"amqp-connection-manager": "^2.2.0",
"amqplib": "^0.5.2"
},
"devDependencies": {
"babel-cli": "^6.26.0",
"babel-core": "^6.26.3",
"babel-preset-env": "^1.6.1",
"babel-register": "^6.26.0",
"@babel/cli": "^7.7.4",
"@babel/core": "^7.7.4",
"@babel/plugin-transform-classes": "^7.7.4",
"@babel/preset-env": "^7.7.4",
"@babel/register": "^7.7.4",
"babel-eslint": "^10.0.3",
"babel-plugin-transform-object-rest-spread": "^7.0.0-beta.3",
"chai": "^4.2.0",
"eslint": "4",
"mocha": "^5.2.0"
Expand All @@ -40,6 +46,6 @@
"amqplib"
],
"files": [
"lib"
]
"lib"
]
}
Loading