Skip to content

Commit

Permalink
feat: emits 'publish' event after message has been sent
Browse files Browse the repository at this point in the history
BREAKING CHANGE: update dependencies & require to have LTS version of
node
  • Loading branch information
AVVS committed Jan 12, 2018
1 parent 49df877 commit 7234a89
Show file tree
Hide file tree
Showing 7 changed files with 1,031 additions and 1,464 deletions.
34 changes: 34 additions & 0 deletions .commitlintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module.exports = {
rules: {
'body-leading-blank': [1, 'always'],
'footer-leading-blank': [1, 'always'],
'header-max-length': [2, 'always', 72],
'scope-case': [2, 'always', 'lower-case'],
'subject-case': [
2,
'never',
['sentence-case', 'start-case', 'pascal-case', 'upper-case']
],
'subject-empty': [2, 'never'],
'subject-full-stop': [2, 'never', '.'],
'type-case': [2, 'always', 'lower-case'],
'type-empty': [2, 'never'],
'type-enum': [2, 'always', [
'build',
'ci',
'docs',
'feat',
'fix',
'perf',
'refactor',
'revert',
'style',
'test',
'major',
'minor',
'patch',
'chore'
]
]
}
};
16 changes: 16 additions & 0 deletions .releaserc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"branch": "master",
"verifyConditions": ["@semantic-release/npm", "@semantic-release/github"],
"analyzeCommits": {
"preset": "angular",
"releaseRules": [
{ "type": "docs", "release": "patch" },
{ "type": "refactor", "release": "patch" },
{ "type": "style", "release": "patch" },
{ "type": "minor", "release": "minor" },
{ "type": "patch", "release": "patch" },
{ "type": "major", "release": "major" },
{ "type": "breaking", "release": "major" }
]
}
}
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ The MIT License (MIT)

Copyright (c) 2015-2017 Vitaly Aminev
Copyright (c) 2015-2017 Remi Development L.P.
Copyright (c) 2017-2018 Makeomatic Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
48 changes: 15 additions & 33 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
"scripts": {
"compile": "babel -d ./lib ./src",
"lint": "eslint ./src",
"test": "npm run lint && npm run test:integration",
"test": "yarn lint && yarn test:integration",
"test:integration": "cross-env NODE_ENV=test nyc mocha",
"prepublishOnly": "npm run compile",
"bench": "npm run compile && node ./bench/roundtrip.js",
"semantic-release": "semantic-release pre && npm publish && semantic-release post",
"prepublishOnly": "yarn compile",
"bench": "yarn compile && node ./bench/roundtrip.js",
"semantic-release": "semantic-release",
"commit": "simple-commit-message"
},
"engines": {
"node": ">= 6.2.0",
"npm": ">= 3.x.x"
"node": ">= 8.9.0",
"npm": ">= 4.x.x"
},
"repository": {
"type": "git",
Expand All @@ -28,9 +28,9 @@
},
"homepage": "https://github.com/microfleet/transport-amqp#readme",
"devDependencies": {
"@makeomatic/deploy": "^4.1.9",
"@makeomatic/deploy": "^5.0.2",
"babel-cli": "^6.26.0",
"babel-eslint": "^8.0.3",
"babel-eslint": "^8.2.1",
"babel-plugin-istanbul": "^4.1.4",
"babel-plugin-transform-class-properties": "^6.24.1",
"babel-plugin-transform-object-rest-spread": "^6.26.0",
Expand All @@ -41,17 +41,16 @@
"codecov": "^3.0.0",
"cross-env": "^5.1.3",
"cz-conventional-changelog": "^2.1.0",
"eslint": "^4.13.1",
"eslint": "^4.15.0",
"eslint-config-makeomatic": "^2.0.1",
"eslint-plugin-import": "^2.8.0",
"eslint-plugin-mocha": "^4.11.0",
"eslint-plugin-promise": "^3.6.0",
"jaeger-client": "^3.7.0",
"microtime": "^2.1.6",
"mocha": "^4.0.1",
"microtime": "^2.1.7",
"mocha": "^4.1.0",
"nyc": "^11.4.1",
"semantic-release": "8.x.x",
"sinon": "^4.1.3",
"sinon": "^4.1.4",
"stdout-stream": "^1.4.0"
},
"peerDependencies": {
Expand All @@ -66,34 +65,17 @@
"eventemitter3": "^3.0.0",
"hashlru": "^2.2.0",
"is": "^3.2.1",
"joi": "^13.0.2",
"joi": "^13.1.0",
"json-stringify-safe": "^5.0.1",
"lodash": "^4.17.2",
"object-hash": "^1.2.0",
"opentracing": "^0.14.1",
"pino": "^4.10.2",
"pino": "^4.10.3",
"uuid": "^3.1.0"
},
"files": [
"lib/",
"src/",
"yarn.lock"
],
"release": {
"verifyConditions": "@makeomatic/condition-semaphore",
"analyzeCommits": "simple-commit-message",
"generateNotes": "github-post-release",
"getLastRelease": "@makeomatic/last-release-npm",
"branch": "master"
},
"config": {
"pre-git": {
"commit-msg": "simple",
"pre-commit": [],
"pre-push": [],
"post-commit": [],
"post-checkout": [],
"post-merge": []
}
}
]
}
72 changes: 36 additions & 36 deletions src/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -960,10 +960,7 @@ class AMQPTransport extends EventEmitter {

return promise
.return(this)
.call('createMessageHandler', routing, message, options, publishMessage, span)
.finally(() => {
this.log.debug('private queue resolved after %s', latency(time));
});
.call('createMessageHandler', routing, message, options, publishMessage, span);
}

// work with cache if options.cache is set and is number
Expand All @@ -973,14 +970,14 @@ class AMQPTransport extends EventEmitter {
return Promise.resolve(cachedResponse.value);
}

// slightly longer timeout, if message was not consumed in time, it will return with expiration
return new Promise((resolve, reject) => {
const { replyStorage } = this;
// generate response id
const correlationId = options.correlationId || uuid.v4();
// timeout before RPC times out
const timeout = options.timeout || this.config.timeout;
const { replyStorage } = this;
// generate response id
const correlationId = options.correlationId || uuid.v4();
// timeout before RPC times out
const timeout = options.timeout || this.config.timeout;

// slightly longer timeout, if message was not consumed in time, it will return with expiration
const publishPromise = new Promise((resolve, reject) => {
// push into RPC request storage
replyStorage.push(correlationId, {
timeout,
Expand All @@ -991,35 +988,38 @@ class AMQPTransport extends EventEmitter {
cache: cachedResponse,
timer: null,
});
});

// debugging
this.log.trace('message pushed into reply queue in %s', latency(time));
// debugging
this.log.trace('message pushed into reply queue in %s', latency(time));

// add custom header for routing over amq.headers exchange
set(options, 'headers.reply-to', replyTo);
// add custom header for routing over amq.headers exchange
set(options, 'headers.reply-to', replyTo);

// add opentracing instrumentation
if (span) {
this.tracer.inject(span.context(), FORMAT_TEXT_MAP, options.headers);
}
// add opentracing instrumentation
if (span) {
this.tracer.inject(span.context(), FORMAT_TEXT_MAP, options.headers);
}

// this is to ensure that queue is not overflown and work will not
// be completed later on
publishMessage
.call(this, routing, message, {
...options,
replyTo,
correlationId,
expiration: Math.ceil(timeout * 0.9).toString(),
}, span)
.tap(() => {
this.log.trace('message published in %s', latency(time));
})
.catch((err) => {
this.log.error('error sending message', err);
replyStorage.reject(correlationId, err);
});
});
// this is to ensure that queue is not overflown and work will not
// be completed later on
publishMessage
.call(this, routing, message, {
...options,
replyTo,
correlationId,
expiration: Math.ceil(timeout * 0.9).toString(),
}, span)
.tap(() => {
this.log.trace('message published in %s', latency(time));
this.emit('publish', routing, message);
})
.catch((err) => {
this.log.error('error sending message', err);
replyStorage.reject(correlationId, err);
});

return publishPromise;
}

/**
Expand Down
8 changes: 4 additions & 4 deletions test/amqp-transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const stringify = require('json-stringify-safe');
const sinon = require('sinon');
const assert = require('assert');
const microtime = require('microtime');
const MockTracer = require('opentracing/lib/mock_tracer').MockTracer;
const { MockTracer } = require('opentracing/lib/mock_tracer');
const debug = require('debug')('amqp');

// add inject/extract implementation
Expand Down Expand Up @@ -277,7 +277,7 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {
];

return Promise.all(promises).spread((initial, cached, nonCached) => {
const toMiliseconds = latency.toMiliseconds;
const { toMiliseconds } = latency;
assert.equal(toMiliseconds(initial.time), toMiliseconds(cached.time));
assert(toMiliseconds(initial.time) < toMiliseconds(nonCached.time));
});
Expand Down Expand Up @@ -536,7 +536,7 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {
}

it('reestablishing consumed queue', () => {
const transport = this.transport;
const { transport } = this;
const publish = () => transport.publishAndWait('/', { foo: 'bar' }, { confirm: true });

return transport
Expand All @@ -554,7 +554,7 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {
});

it('should create consumed queue', (done) => {
const transport = this.transport;
const { transport } = this;
transport.on('consumed-queue-reconnected', (consumer, queue) => {
debug('initial reconnect');
// #2 reconnected, try publish
Expand Down
Loading

0 comments on commit 7234a89

Please sign in to comment.