Skip to content

Commit

Permalink
fix: verify that options is an object
Browse files Browse the repository at this point in the history
  • Loading branch information
AVVS committed Nov 13, 2019
1 parent 8f72f0a commit bef268a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 30 deletions.
55 changes: 28 additions & 27 deletions bench/roundtrip.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const Promise = require('bluebird');
const Benchmark = require('benchmark');
const AMQPTransport = require('../lib');
const fmt = require('util').format;
const AMQPTransport = require('../lib');

const configuration = {
exchange: 'test-exchange',
Expand Down Expand Up @@ -29,10 +29,11 @@ function listener(message, headers, actions, callback) {
}

// opts for consumer
const opts = Object.assign({}, configuration, {
const opts = {
...configuration,
queue: 'tq',
listen: 'tq',
});
};

// publisher
const publisher = new AMQPTransport(configuration);
Expand All @@ -42,27 +43,27 @@ Promise.join(
AMQPTransport.connect(opts, listener),
publisher.connect()
)
.spread((consumer) => {
const suite = new Benchmark.Suite('RabbitMQ');
suite.add('Round-trip', {
defer: true,
fn: function test(deferred) {
return publisher
.publishAndWait('tq', 'tq')
.finally(() => {
messagesSent += 1;
deferred.resolve();
});
},
})
.on('complete', function suiteCompleted() {
const stats = this.filter('fastest')[0].stats;
const times = this.filter('fastest')[0].times;
process.stdout.write(fmt('Messages sent: %s\n', messagesSent));
process.stdout.write(fmt('Mean is %s ms ~ %s %\n', stats.mean * 1000, stats.rme));
process.stdout.write(fmt('Total time is %s s %s s\n', times.elapsed, times.period));
consumer.close();
publisher.close();
})
.run({ async: false, defer: true });
});
.spread((consumer) => {
const suite = new Benchmark.Suite('RabbitMQ');
suite.add('Round-trip', {
defer: true,
fn: function test(deferred) {
return publisher
.publishAndWait('tq', 'tq')
.finally(() => {
messagesSent += 1;
deferred.resolve();
});
},
})
.on('complete', function suiteCompleted() {
const { stats } = this.filter('fastest')[0];
const { times } = this.filter('fastest')[0];
process.stdout.write(fmt('Messages sent: %s\n', messagesSent));
process.stdout.write(fmt('Mean is %s ms ~ %s %\n', stats.mean * 1000, stats.rme));
process.stdout.write(fmt('Total time is %s s %s s\n', times.elapsed, times.period));
consumer.close();
publisher.close();
})
.run({ async: false, defer: true });
});
10 changes: 7 additions & 3 deletions src/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,9 @@ class AMQPTransport extends EventEmitter {

_replyOptions(options = {}) {
return {
simpleResponse: options.simpleResponse === undefined ? this._defaultOpts.simpleResponse : options.simpleResponse,
simpleResponse: options.simpleResponse === undefined
? this._defaultOpts.simpleResponse
: options.simpleResponse,
};
}

Expand Down Expand Up @@ -1126,6 +1128,8 @@ class AMQPTransport extends EventEmitter {
* @return {Promise}
*/
async createMessageHandler(routing, message, options, publishMessage, span) {
assert(typeof options === 'object' && options !== null, 'options must be an object');

const replyTo = options.replyTo || this._replyTo;
const time = process.hrtime();
const replyOptions = this._replyOptions(options);
Expand Down Expand Up @@ -1193,10 +1197,10 @@ class AMQPTransport extends EventEmitter {
expiration: Math.ceil(timeout * 0.9).toString(),
}, span)
.tap(() => {
this.log.trace('message published in %s', latency(time));
this.log.trace({ latency: latency(time) }, 'message published');
})
.catch((err) => {
this.log.error('error sending message', err);
this.log.error({ err }, 'error sending message');
replyStorage.reject(correlationId, err);
});

Expand Down

0 comments on commit bef268a

Please sign in to comment.