Skip to content

Commit db9b134

Browse files
committed
chore(): initial commit
0 parents  commit db9b134

9 files changed

+285
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.DS_Store
2+
node_modules/

.jscsrc

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{
2+
"validateIndentation": "\t",
3+
4+
"requireSpaceAfterKeywords": [
5+
"if",
6+
"else",
7+
"for",
8+
"while",
9+
"do",
10+
"switch",
11+
"case",
12+
"return",
13+
"try",
14+
"catch",
15+
"typeof"
16+
],
17+
18+
"requireSpacesInNamedFunctionExpression": {
19+
"beforeOpeningCurlyBrace": true
20+
},
21+
"disallowSpacesInNamedFunctionExpression": {
22+
"beforeOpeningRoundBrace": true
23+
},
24+
"requireSpacesInAnonymousFunctionExpression": {
25+
"beforeOpeningRoundBrace": true,
26+
"beforeOpeningCurlyBrace": true
27+
},
28+
"validateParameterSeparator": ", ",
29+
30+
"disallowMultipleVarDecl": true,
31+
32+
"requireSpacesInConditionalExpression": true,
33+
"disallowYodaConditions": true,
34+
35+
"disallowSpacesInsideParentheses": true,
36+
"requireSpacesInsideObjectBrackets": "all",
37+
"disallowSpaceAfterObjectKeys": true,
38+
"requireSpaceBeforeObjectValues": true,
39+
"requireSpacesInsideArrayBrackets": "all",
40+
"disallowTrailingComma": true,
41+
42+
"requireSpaceAfterLineComment": true,
43+
44+
"requireSpaceBeforeBlockStatements": true,
45+
"disallowNewlineBeforeBlockStatements": true,
46+
"disallowPaddingNewlinesInBlocks": true,
47+
48+
"disallowTrailingWhitespace": true,
49+
"validateLineBreaks": "LF"
50+
}

.jshintrc

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"camelcase": true,
3+
"devel": true,
4+
"immed": true,
5+
"latedef": true,
6+
"maxlen": 120,
7+
"node": true,
8+
"nonbsp": true,
9+
"trailing": true,
10+
"undef": true,
11+
"unused": true,
12+
"white": true,
13+
"globals": {
14+
"angular": true,
15+
"describe": true,
16+
"it": true,
17+
"expect": true,
18+
"beforeEach": true
19+
}
20+
}

amqp.js

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
var _ = require('lodash');
2+
var amqplib = require('amqplib');
3+
var format = require('util').format;
4+
5+
function AMQP(config) {
6+
var uri = config;
7+
8+
if (_.isObject(uri)) {
9+
_.defaults(uri, {
10+
user: '',
11+
password: '',
12+
host: '',
13+
vhost: ''
14+
});
15+
16+
uri = format(
17+
'amqp://%s:%s@%s/%s',
18+
uri.user,
19+
uri.password,
20+
uri.host,
21+
uri.vhost
22+
);
23+
}
24+
25+
if (!_.isString(uri) || !uri) throw new Error('Malformed config');
26+
27+
return amqplib.connect(uri).then(function (conn) {
28+
return conn.createChannel();
29+
});
30+
}
31+
32+
module.exports = AMQP;

consumer.js

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
function Consumer(amqp) {
2+
if (!this.domain) throw new Error('Missing domain property');
3+
if (!amqp) throw new Error('Missing amqp connection');
4+
5+
this.amqp = amqp;
6+
7+
this.consume(this.domain, this.handleEvent.bind(this));
8+
}
9+
10+
Consumer.prototype.connect = function (callback) {
11+
var self = this;
12+
this.amqp.then(function (channel) {
13+
self.channel = channel;
14+
callback(channel);
15+
});
16+
};
17+
18+
Consumer.prototype.consume = function (queue, callback) {
19+
var self = this;
20+
21+
this.channel.assertQueue(queue);
22+
23+
this.channel.consume(queue, function (msg) {
24+
self.handleMessage(msg, callback);
25+
});
26+
};
27+
28+
Consumer.prototype.handleMessage = function (msg, callback) {
29+
var data = JSON.parse(msg.content.toString());
30+
callback(data, this.onProcessed.bind(this));
31+
};
32+
33+
Consumer.prototype.handleEvent = function (event, callback) {
34+
if (!event.type) throw new Error('Missing event type');
35+
if (!this.projections[event.type]) throw new Error('No projections for ' + event.type);
36+
this.projections[event.type].bind(this)(event.data || event.args || {}, callback);
37+
};
38+
39+
Consumer.prototype.onProcessed = function (err, results) {
40+
this.channel.ack(msg);
41+
42+
if (!msg.properties.replyTo) return;
43+
44+
var reply = { err: err, results: results };
45+
var rpcConfig = { correlationId: msg.properties.correlationId };
46+
47+
this.channel.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(reply)), rpcConfig);
48+
};
49+
50+
module.exports = Consumer;

event-sourcing.js

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
var consumer = require('consumer');
2+
var publisher = require('publisher');
3+
4+
module.exports = {
5+
consumer: consumer,
6+
publisher: publisher
7+
};

mongodb.js

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
var _ = require('lodash');
2+
var format = require('util').format;
3+
var MongoClient = require('mongodb').MongoClient;
4+
5+
function MongoDB(config) {
6+
this.uri = config;
7+
8+
if (_.isObject(this.uri)) {
9+
_.defaults(this.uri, {
10+
host: '',
11+
port: '',
12+
db: ''
13+
});
14+
15+
this.uri = format(
16+
'mongodb://%s:%s/%s',
17+
this.uri.host,
18+
this.uri.port,
19+
this.uri.db
20+
);
21+
}
22+
23+
if (!_.isString(this.uri) || !this.uri) throw new Error('Malformed config');
24+
}
25+
26+
MongoDB.prototype.connect = function (callback) {
27+
if (this.db) return callback(null, this.db);
28+
29+
var self = this;
30+
MongoClient.connect(this.uri, function (err, db) {
31+
if (err) throw err;
32+
self.db = db;
33+
callback(err, db);
34+
});
35+
};
36+
37+
module.exports = MongoDB;

package.json

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "event-sourcing",
3+
"version": "0.0.1",
4+
"description": "NodeJS Event-sourcing POC",
5+
"main": "event-sourcing.js",
6+
"scripts": {
7+
"test": "echo \"Error: no test specified\" && exit 1"
8+
},
9+
"repository": {
10+
"type": "git",
11+
"url": "https://github.com/marcghorayeb/event-sourcing"
12+
},
13+
"author": "Marc Ghorayeb <[email protected]>",
14+
"license": "ISC",
15+
"bugs": {
16+
"url": "https://github.com/marcghorayeb/event-sourcing/issues"
17+
},
18+
"homepage": "https://github.com/marcghorayeb/event-sourcing",
19+
"dependencies": {
20+
"amqplib": "^0.3.2",
21+
"lodash": "^3.7.0",
22+
"mongodb": "^2.0.28",
23+
"node-uuid": "^1.4.3"
24+
}
25+
}

publisher.js

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
var amqp = require('./amqp');
2+
var uuid = require('node-uuid');
3+
4+
function Publisher(amqp, db) {
5+
this.amqp = amqp;
6+
7+
var self = this;
8+
db.collection('events', function (err, col) {
9+
self.col = col;
10+
});
11+
}
12+
13+
Publisher.prototype.connect = function (callback) {
14+
var self = this;
15+
this.amqp.then(function (channel) {
16+
self.channel = channel;
17+
callback(channel);
18+
});
19+
};
20+
21+
Publisher.prototype.askRPC = function (queue, event, callback) {
22+
var self = this;
23+
this.persistEvent(event, function (err) {
24+
if (err) throw err;
25+
self.emitEvent(queue, event, callback);
26+
});
27+
};
28+
29+
Publisher.prototype.persistEvent = function (event, callback) {
30+
this.col.insertOne(event, function (err, results) {
31+
if (err) throw err;
32+
callback(err, results);
33+
});
34+
};
35+
36+
Publisher.prototype.assertReplyQueue = function () {
37+
return this.channel.assertQueue('', { exclusive: true }).then(function (queueAssertion) {
38+
return queueAssertion.queue;
39+
});
40+
};
41+
42+
Publisher.prototype.emitEvent = function (queue, event, callback) {
43+
event = new Buffer(JSON.stringify(event));
44+
45+
if (!callback) return this.channel.sendToQueue(queue, event);
46+
47+
var self = this;
48+
var rpcConfig = { correlationId: uuid.v4(), replyTo: null };
49+
var rpcCallback = function (msg) {
50+
if (msg.properties.correlationId !== rpcConfig.correlationId) return;
51+
callback(msg);
52+
};
53+
54+
this.assertReplyQueue().then(function (responseQ) {
55+
self.channel.consume(responseQ, rpcCallback, { noAck: true }).then(function () {
56+
rpcConfig.replyTo = responseQ;
57+
self.channel.sendToQueue(queue, event, rpcConfig);
58+
});
59+
});
60+
};
61+
62+
module.exports = Publisher;

0 commit comments

Comments
 (0)