Skip to content

Commit

Permalink
feat: adds pre/after events for _onConsume/reply (#25)
Browse files Browse the repository at this point in the history
Used in the build-in router
  • Loading branch information
AVVS authored Aug 29, 2017
1 parent 627830c commit 1517368
Show file tree
Hide file tree
Showing 4 changed files with 590 additions and 310 deletions.
22 changes: 11 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"compile": "babel -d ./lib ./src",
"lint": "eslint ./src",
"test": "npm run lint && npm run test:integration",
"test:integration": "cross-env NODE_ENV=test nyc ./node_modules/.bin/_mocha",
"test:integration": "cross-env NODE_ENV=test nyc mocha",
"prepublishOnly": "npm run compile",
"bench": "npm run compile && node ./bench/roundtrip.js",
"semantic-release": "semantic-release pre && npm publish && semantic-release post",
Expand All @@ -28,30 +28,30 @@
},
"homepage": "https://github.com/microfleet/transport-amqp#readme",
"devDependencies": {
"@makeomatic/deploy": "^4.1.3",
"babel-cli": "^6.24.1",
"@makeomatic/deploy": "^4.1.5",
"babel-cli": "^6.26.0",
"babel-eslint": "^7.2.3",
"babel-plugin-istanbul": "^4.1.4",
"babel-plugin-transform-class-properties": "^6.24.1",
"babel-plugin-transform-object-rest-spread": "^6.23.0",
"babel-plugin-transform-object-rest-spread": "^6.26.0",
"babel-plugin-transform-strict-mode": "^6.24.1",
"babel-register": "^6.24.1",
"babel-register": "^6.26.0",
"benchmark": "^2.1.4",
"chai": "^4.1.1",
"codecov": "^2.3.0",
"cross-env": "^5.0.3",
"cross-env": "^5.0.5",
"cz-conventional-changelog": "^2.0.0",
"eslint": "^4.4.0",
"eslint-config-makeomatic": "^1.0.1",
"eslint": "^4.5.0",
"eslint-config-makeomatic": "^1.1.0",
"eslint-plugin-import": "^2.7.0",
"eslint-plugin-mocha": "^4.11.0",
"eslint-plugin-promise": "^3.5.0",
"jaeger-client": "^3.5.3",
"microtime": "^2.1.6",
"mocha": "^3.5.0",
"nyc": "^11.0.3",
"semantic-release": "^6.3.2",
"sinon": "^3.0.0",
"semantic-release": "^7.0.2",
"sinon": "^3.2.1",
"stdout-stream": "^1.4.0"
},
"peerDependencies": {
Expand All @@ -62,7 +62,7 @@
"@microfleet/amqp-coffee": "^1.1.0",
"bluebird": "^3.5.0",
"common-errors": "^1.0.4",
"debug": "^2.6.8",
"debug": "^3.0.1",
"eventemitter3": "^2.0.3",
"hashlru": "^2.2.0",
"is": "^3.2.1",
Expand Down
33 changes: 26 additions & 7 deletions src/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ const initRoutingFn = (messageHandler, transport) => {
function responseHandler(raw, error, data) {
const { properties, span } = raw;
return !properties.replyTo || !properties.correlationId
? transport.noop(error, data, span)
: transport.reply(properties, { error, data }, span);
? transport.noop(error, data, span, raw)
: transport.reply(properties, { error, data }, span, raw);
}

/**
Expand Down Expand Up @@ -230,19 +230,25 @@ class AMQPTransport extends EventEmitter {
* Noop function with empty correlation id and reply to data
* @param {Error} err
* @param {Mixed} data
* @param {Span} [span]
* @param {AMQPMessage} [raw]
*/
noop(error, data, span) {
noop(error, data, span, raw) {
const msg = stringify({ error, data }, jsonSerializer);
this.log.debug('when replying to message with %s response could not be delivered', msg);

if (span) {
if (span !== undefined) {
if (error) {
span.setTag(Tags.ERROR, true);
span.log({ event: 'error', 'error.object': error, message: error.message, stack: error.stack });
}

span.finish();
}

if (raw !== undefined) {
this.emit('after', raw);
}
}

close() {
Expand Down Expand Up @@ -859,21 +865,32 @@ class AMQPTransport extends EventEmitter {
*
* @param {Object} headers - incoming message headers
* @param {Mixed} message - message to send
* @param {Span} [span] - opentracing span.
* @param {AMQPMessage} [raw] - raw message.
*/
reply(properties, message, span) {
reply(properties, message, span, raw) {
if (!properties.replyTo || !properties.correlationId) {
const error = new ValidationError('replyTo and correlationId not found in properties', 400);

if (span) {
if (span !== undefined) {
span.setTag(Tags.ERROR, true);
span.log({ event: 'error', 'error.object': error, message: error.message, stack: error.stack });
span.finish();
}

if (raw !== undefined) {
this.emit('after', raw);
}

return Promise.reject(error);
}

const promise = this.send(properties.replyTo, message, { correlationId: properties.correlationId }, span);
let promise = this.send(properties.replyTo, message, { correlationId: properties.correlationId }, span);

if (raw !== undefined) {
promise = promise
.finally(() => this.emit('after', raw));
}

return span === undefined
? promise
Expand Down Expand Up @@ -1009,6 +1026,8 @@ class AMQPTransport extends EventEmitter {
return function consumeMessage(originalMessage) {
const properties = originalMessage.properties;

amqpTransport.emit('pre', originalMessage);

// pass to the consumer message router
// data - properties - originalMessage
return router.call(
Expand Down
11 changes: 11 additions & 0 deletions test/amqp-transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {

describe('AMQPTransport.multiConnect', () => {
let acksCalled = 0;
const preCount = sinon.spy();
const postCount = sinon.spy();

const conf = {
exchange: configuration.exchange,
Expand All @@ -308,6 +310,7 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {
callback(null, message);
});

// adds QoS for the first queue, but not all the others
const consumer = AMQPTransport.multiConnect(conf, spy, [{
neck: 1,
}]);
Expand All @@ -317,6 +320,9 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {
return Promise.join(consumer, publisher, (multi, amqp) => {
this.multi = multi;
this.publisher = amqp;

this.multi.on('pre', preCount);
this.multi.on('after', postCount);
});
});

Expand All @@ -342,6 +348,7 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {
.map(pub, message => (
this.publisher.publishAndWait(message.route, message.message)
))
.delay(10) // to allow async action to call 'after'
.then((responses) => {
assert.equal(acksCalled, q1.length);

Expand All @@ -351,6 +358,10 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {
});

assert.equal(this.spy.callCount, pub.length);

// ensure that pre & after are called for each message
assert.equal(preCount.callCount, pub.length);
assert.equal(postCount.callCount, pub.length);
});
});

Expand Down
Loading

0 comments on commit 1517368

Please sign in to comment.