Skip to content

Commit

Permalink
RPC message pattern setup #40
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya committed Mar 1, 2022
1 parent 56ae7be commit d6a4df5
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 3 deletions.
56 changes: 56 additions & 0 deletions weather-data-ingestor-microservice/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
var amqp = require('amqplib/callback_api');

var args = process.argv.slice(2);

if (args.length == 0) {
console.log("Usage: rpc_client.js num");
process.exit(1);
}

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
var correlationId = generateUuid();
var num = parseInt(args[0]);

console.log(' [x] Requesting fib(%d)', num);

channel.consume(q.queue, function(msg) {
if (msg.properties.correlationId == correlationId) {
console.log(' [.] Got %s', msg.content.toString());
setTimeout(function() {
connection.close();
process.exit(0)
}, 500);
}
}, {
noAck: true
});
let date='02-22-22'; let time='6AM';
let data = {date, time};
channel.sendToQueue('rpc_queue',
// Buffer.from(num.toString()),{

Buffer.from( data.toString()),{
correlationId: correlationId,
replyTo: q.queue });
});
});
});

function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
1 change: 1 addition & 0 deletions weather-data-ingestor-microservice/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"author": "anbadrin",
"license": "ISC",
"dependencies": {
"amqplib": "^0.8.0",
"aws-sdk": "^2.1066.0",
"express": "^4.17.2"
},
Expand Down
58 changes: 55 additions & 3 deletions weather-data-ingestor-microservice/server.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,64 @@

const express = require('express')
const routes = require('./api/routes');


//init express
const app = express()
const port = 3001
app.use(express.json());

routes(app);
// routes(app);

app.listen(port, () => {
console.log(`Server is listening on port ${port}`)
})
console.log(`Server is listening on port ${port}`)
})


// amqplib is a protocol for messaging . So use that
var amqp = require('amqplib/callback_api');
//connect to the rabitmq server

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
// create channel ,establiish connection and declare the queue
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'rpc_queue';

channel.assertQueue(queue, {
durable: false
});
// to spread the load equally over multiple servers we need to set the prefetch setting on the channel
channel.prefetch(1);
console.log(' [x] Awaiting RPC requests');
channel.consume(queue, function reply(msg) {

// var n = parseInt(msg.content.toString());

// console.log(" [.] fib(%d)", n);

// var r = fibonacci(n); // function call. need to check with anita
console.log("msg"+msg.content.toJSON());
var r= msg.content;
console.log(r.toJSON());
channel.sendToQueue(msg.properties.replyTo,
Buffer.from(r.toString()), {
correlationId: msg.properties.correlationId
});

channel.ack(msg);
});
});
});

// function fibonacci(n) {
// if (n == 0 || n == 1)
// return n;
// else
// return fibonacci(n - 1) + fibonacci(n - 2);
// }

0 comments on commit d6a4df5

Please sign in to comment.