diff --git a/weather-data-ingestor-microservice/client.js b/weather-data-ingestor-microservice/client.js new file mode 100644 index 0000000..3c9fa3b --- /dev/null +++ b/weather-data-ingestor-microservice/client.js @@ -0,0 +1,56 @@ +var amqp = require('amqplib/callback_api'); + +var args = process.argv.slice(2); + +if (args.length == 0) { + console.log("Usage: rpc_client.js num"); + process.exit(1); +} + +amqp.connect('amqp://localhost', function(error0, connection) { + if (error0) { + throw error0; + } + connection.createChannel(function(error1, channel) { + if (error1) { + throw error1; + } + channel.assertQueue('', { + exclusive: true + }, function(error2, q) { + if (error2) { + throw error2; + } + var correlationId = generateUuid(); + var num = parseInt(args[0]); + + console.log(' [x] Requesting fib(%d)', num); + + channel.consume(q.queue, function(msg) { + if (msg.properties.correlationId == correlationId) { + console.log(' [.] Got %s', msg.content.toString()); + setTimeout(function() { + connection.close(); + process.exit(0) + }, 500); + } + }, { + noAck: true + }); + let date='02-22-22'; let time='6AM'; + let data = {date, time}; + channel.sendToQueue('rpc_queue', + // Buffer.from(num.toString()),{ + + Buffer.from( data.toString()),{ + correlationId: correlationId, + replyTo: q.queue }); + }); + }); +}); + +function generateUuid() { + return Math.random().toString() + + Math.random().toString() + + Math.random().toString(); +} \ No newline at end of file diff --git a/weather-data-ingestor-microservice/package.json b/weather-data-ingestor-microservice/package.json index 63ce9e5..20c8f6f 100644 --- a/weather-data-ingestor-microservice/package.json +++ b/weather-data-ingestor-microservice/package.json @@ -10,6 +10,7 @@ "author": "anbadrin", "license": "ISC", "dependencies": { + "amqplib": "^0.8.0", "aws-sdk": "^2.1066.0", "express": "^4.17.2" }, diff --git a/weather-data-ingestor-microservice/server.js b/weather-data-ingestor-microservice/server.js index afecd07..a2f2e64 100644 --- a/weather-data-ingestor-microservice/server.js +++ b/weather-data-ingestor-microservice/server.js @@ -1,12 +1,64 @@ + const express = require('express') const routes = require('./api/routes'); + + //init express const app = express() const port = 3001 app.use(express.json()); - routes(app); +// routes(app); app.listen(port, () => { - console.log(`Server is listening on port ${port}`) - }) \ No newline at end of file + console.log(`Server is listening on port ${port}`) +}) + + +// amqplib is a protocol for messaging . So use that +var amqp = require('amqplib/callback_api'); +//connect to the rabitmq server + +amqp.connect('amqp://localhost', function(error0, connection) { + if (error0) { + throw error0; + } + // create channel ,establiish connection and declare the queue + connection.createChannel(function(error1, channel) { + if (error1) { + throw error1; + } + var queue = 'rpc_queue'; + + channel.assertQueue(queue, { + durable: false + }); + // to spread the load equally over multiple servers we need to set the prefetch setting on the channel + channel.prefetch(1); + console.log(' [x] Awaiting RPC requests'); + channel.consume(queue, function reply(msg) { + + // var n = parseInt(msg.content.toString()); + + // console.log(" [.] fib(%d)", n); + + // var r = fibonacci(n); // function call. need to check with anita + console.log("msg"+msg.content.toJSON()); + var r= msg.content; + console.log(r.toJSON()); + channel.sendToQueue(msg.properties.replyTo, + Buffer.from(r.toString()), { + correlationId: msg.properties.correlationId + }); + + channel.ack(msg); + }); + }); +}); + +// function fibonacci(n) { +// if (n == 0 || n == 1) +// return n; +// else +// return fibonacci(n - 1) + fibonacci(n - 2); +// } \ No newline at end of file