Skip to content

Commit

Permalink
RabbitMQ - Basics, Direct and Topics (Pattern Matching) examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajesh committed Dec 27, 2019
0 parents commit ebb248c
Show file tree
Hide file tree
Showing 14 changed files with 854 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.log
node_modules
17 changes: 17 additions & 0 deletions config/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module.exports = {
NODE_PORT: 3343,
RBMQ: {
SERVER: 'amqp://localhost',
EXCHANGE: {
C_VALIDATE_JSON: 'validateJSON',
T_VALIDATE_JSON: 'topic_validateJSON'
},
ROUTING: {
C_VALIDATE_JSON: 'validateJSON',
T_VALIDATE_JSON: 'topic_validateJSON'
},
QUEUE: {
T_VALIDATE_JSON: 'topic_validateJSON'
}
},
};
36 changes: 36 additions & 0 deletions direct/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const amqp = require("amqplib");
const domain = require("domain");
const CONFIG = require('../config/index');
const dom = domain.create();
let consumer = null;

dom.on("error", relisten);
dom.run(listen);

function listen() {
consumer = amqp.connect(CONFIG.RBMQ.SERVER);
consumer.then(function(conn) {
return conn.createChannel().then(function(ch) {
ch.assertExchange(CONFIG.RBMQ.ROUTING.C_VALIDATE_JSON, "direct", {durable: true, autoDelete: false});
//one-to-one messaging uses the default exchange, where queue name is the routing key
/**
* >>>> `exclusive` set to false; so that if consumer is down, the incoming message are in queue without deleting <<<<
*/
ch.assertQueue(CONFIG.RBMQ.ROUTING.C_VALIDATE_JSON, {durable: true, autoDelete: false, exclusive: false});
ch.consume(CONFIG.RBMQ.ROUTING.C_VALIDATE_JSON, function(message) {
//callback funtion on receiving messages
// console.log(message.content.toString());
}, {noAck: true});
});
}).then(null, function(err) {
console.error("Exception handled, reconnecting...\nDetail:\n" + err);
setTimeout(listen, 5000);
});
}

function relisten() {
consumer.then(function(conn) {
conn.close();
});
setTimeout(listen, 5000);
}
49 changes: 49 additions & 0 deletions direct/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const EXPRESS = require('express');
const APP = EXPRESS();
const CONFIG = require('../config/index');
const CORE = require('./producer');
const PORT = CONFIG.NODE_PORT;
let count = 0;

APP.use(EXPRESS.json());

APP.post('/api/v1/save', (req, res) => {
count++;
console.log('Request received - ', count);
try {
req.body = { name: 'api_c1 - '+count, timestamp: new Date(), metric: false };
const _payLoad = JSON.stringify(req.body);
CORE(CONFIG.RBMQ.EXCHANGE.C_VALIDATE_JSON, _payLoad, function (err, success) {
if (err) {
res.send(err);
} else {
res.status(200);
res.send({
code: 200,
message: 'Data received',
data: success
});
}
});
} catch (e) {
res.status(400);
res.send({
code: 400,
message: 'Bad request',
});
}
});

APP.post('/api/v1/delete', (req, res) => {
const amqp = require("amqplib");
const CONFIG = require('../config/index');
admin = amqp.connect(CONFIG.RBMQ.SERVER);
admin.then(function (conn) {
});
});

APP.listen(PORT, () => {
// require('http').globalAgent.maxSockets = 100000;
// console.log(http.globalAgent.maxSockets); // TODO: log!
console.log(`Node Server running at: http://localhost:${PORT}/`);
});
35 changes: 35 additions & 0 deletions direct/producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const amqp = require("amqplib");
const CONFIG = require('../config/index');

function publisher(EXCHANGE, PAYLOAD, cb) {
try {
producer = amqp.connect(CONFIG.RBMQ.SERVER);
producer.then(function (conn) {
return conn.createConfirmChannel().then(function (ch) {
ch.assertExchange(EXCHANGE, 'direct', {
durable: true,
autoDelete: false
});
//assigning blank string to exchange is to use the default exchange, where queue name is the routing key
ch.publish('', CONFIG.RBMQ.ROUTING.C_VALIDATE_JSON, content = new Buffer(PAYLOAD), options = { contentType: "text/plain"}, function (err, ok) {
if (err != null) {
console.error("Error: failed to send message\n" + err);
cb(err);
} else {
cb(null);
conn.close();
// console.log('<<OK>>', ok); // TODO: log!
}
});
});
}).then(null, function (err) {
console.error('<<<<<< In then callback error => >>>>>> ', err);
cb(err);
});
} catch (error) {
console.error('<<<<<< In try catch callback error => >>>>>> ', error);
cb(error);
}
}

module.exports = publisher;
Loading

0 comments on commit ebb248c

Please sign in to comment.