Skip to content

Commit

Permalink
feat: add error handler on a created consumer
Browse files Browse the repository at this point in the history
It doesn't do much, because all of these events are generally triggered by a publisher, but it's a good example
of how error handling should be done. Start working on consume cancel notification.
Requires dropbox/amqp-coffee#52 to be completed
  • Loading branch information
AVVS committed Jan 14, 2016
1 parent 5420c51 commit 4e7b845
Showing 1 changed file with 43 additions and 6 deletions.
49 changes: 43 additions & 6 deletions src/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class AMQPTransport extends EventEmitter {
password: 'guest',
vhost: '/',
temporaryChannelTimeout: 6000,
clientProperties: {
capabilities: {
consumer_cancel_notify: true,
},
},
},
};

Expand Down Expand Up @@ -164,15 +169,47 @@ class AMQPTransport extends EventEmitter {
}

// pipeline for establishing consumer
// TODO: add error handler here
function establishConsumer() {
return establishQueue().then(createExchange);
return establishQueue()
.tap(createExchange)
.then(function establishErrorHandlers({ consumer }) {
consumer.on('error', function handleError(err) {
// https://www.rabbitmq.com/amqp-0-9-1-reference.html -
switch (err.replyCode) {
case 311:
case 313:
this.log('error working with a channel:', err);
return null;

// access-refused 403
// The client attempted to work with a server entity
// to which it has no access due to security settings.
// not-found 404
// The client attempted to work with a server entity that does not exist.
// resource-locked 405
// The client attempted to work with a server entity
// to which it has no access because another client is working with it.
// precondition-failed 406
// The client requested a method that was not allowed
// because some precondition failed.
default:
this.log('re-establishing connection after %d', err.replyCode);

// don't wait for this to complete
consumer.removeAllListeners();
// eat errors
consumer.on('error', ld.noop);
consumer.close();

// TODO: add exponential back-off
return Promise.delay(500).then(establishConsumer);
}
});
});
}

return establishConsumer().then(() => {
// make sure we recreate queue and establish consumer on reconnect
amqp.on('ready', establishConsumer);
});
// make sure we recreate queue and establish consumer on reconnect
return establishConsumer().then(() => amqp.on('ready', establishConsumer));
}

return amqp.connect()
Expand Down

0 comments on commit 4e7b845

Please sign in to comment.