-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
120 lines (108 loc) · 2.99 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
'use strict'
const Hp = require('hemera-plugin')
const Rabbit = require('rabbot')
function hemeraRabbitmq(hemera, opts) {
const Joi = hemera.joi
const topic = 'rabbitmq'
return Rabbit.configure(opts.rabbot)
.then(() => {
// Sends all unhandled messages back to the queue.
Rabbit.nackUnhandled()
// after this call, any new callbacks attached via handle will be wrapped in a try/catch
// that nacks the message on an error
Rabbit.nackOnError()
hemera.decorate('rabbitmq', {
addPubSubProxy: opts => publisher(opts),
addRequestProxy: opts => requestor(opts)
})
function publisher(consOpts) {
Rabbit.handle(consOpts, msg => {
let pattern = {
pubsub$: true,
topic: `${topic}.${consOpts.type}`,
data: msg.body
}
if (typeof consOpts.pattern === 'object') {
pattern = Object.assign(consOpts.pattern, pattern)
}
hemera.act(pattern, err => {
if (err) {
msg.nack()
return
}
msg.ack()
})
})
}
function requestor(consOpts) {
Rabbit.handle(consOpts, msg => {
let pattern = {
topic: `${topic}.${consOpts.type}`,
data: msg.body
}
if (typeof consOpts.pattern === 'object') {
pattern = Object.assign(consOpts.pattern, pattern)
}
hemera.act(pattern, (err, req) => {
if (err) {
msg.nack()
return
}
msg.ack()
msg.reply(req)
})
})
}
hemera.add(
{
topic,
cmd: 'publish',
exchange: Joi.string().required(),
options: Joi.object().required(),
data: Joi.object().required()
},
function(req) {
return Rabbit.publish(
req.exchange,
Object.assign({ body: req.data }, req.options)
)
.then(() => true)
.catch(err => convertRabbotErr(err))
}
)
hemera.add(
{
topic,
cmd: 'request',
exchange: Joi.string().required(),
options: Joi.object().required(),
data: Joi.object().required()
},
function(req) {
return Rabbit.request(
req.exchange,
Object.assign({ body: req.data }, req.options)
)
.then(response => {
response.ack()
return response.body
})
.catch(err => convertRabbotErr(err))
}
)
})
.catch(err => convertRabbotErr(err))
}
function convertRabbotErr(err) {
if (typeof err === 'string') {
return Promise.reject(new Error('rabbot: ' + err))
}
return err
}
const plugin = Hp(hemeraRabbitmq, {
hemera: '^5.0.0',
name: require('./package.json').name,
dependencies: ['hemera-joi'],
decorators: ['joi']
})
module.exports = plugin