Skip to content

Commit

Permalink
GH-849: Pub. Confirm/Return defensive code
Browse files Browse the repository at this point in the history
Resolves #849

Add defensive catch blocks; we currently catch exceptions in user `handleConfirm`
calls, but not for returns, or for the code that determines which listener to
call.

Throwing exceptions to the client causes the channel to be killed.

# Conflicts:
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/PublisherCallbackChannelImpl.java
  • Loading branch information
garyrussell authored and artembilan committed Nov 19, 2018
1 parent 1317215 commit c582e1a
Showing 1 changed file with 30 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,15 @@ public void handleNack(long seq, boolean multiple)
}

private synchronized void processAck(long seq, boolean ack, boolean multiple, boolean remove) {
try {
doProcessAck(seq, ack, multiple, remove);
}
catch (Exception e) {
this.logger.error("Failed to process publisher confirm", e);
}
}

private void doProcessAck(long seq, boolean ack, boolean multiple, boolean remove) {
if (multiple) {
/*
* Piggy-backed ack - extract all Listeners for this and earlier
Expand Down Expand Up @@ -905,12 +914,14 @@ private synchronized void processAck(long seq, boolean ack, boolean multiple, bo
Listener listener = this.listenerForSeq.remove(seq);
if (listener != null) {
SortedMap<Long, PendingConfirm> confirmsForListener = this.pendingConfirms.get(listener);
PendingConfirm pendingConfirm;
if (remove) {
pendingConfirm = confirmsForListener.remove(seq);
}
else {
pendingConfirm = confirmsForListener.get(seq);
PendingConfirm pendingConfirm = null;
if (confirmsForListener != null) { // should never happen; defensive
if (remove) {
pendingConfirm = confirmsForListener.remove(seq);
}
else {
pendingConfirm = confirmsForListener.get(seq);
}
}
if (pendingConfirm != null) {
doHandleConfirm(ack, listener, pendingConfirm);
Expand Down Expand Up @@ -957,14 +968,25 @@ public void handleReturn(int replyCode,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String uuidObject = properties.getHeaders().get(RETURN_CORRELATION_KEY).toString();
Listener listener = this.listeners.get(uuidObject);
Listener listener = null;
if (uuidObject != null) {
listener = this.listeners.get(uuidObject);
}
else {
this.logger.error("No 'spring_listener_return_correlation' header in returned message");
}
if (listener == null || !listener.isReturnListener()) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("No Listener for returned message");
}
}
else {
listener.handleReturn(replyCode, replyText, exchange, routingKey, properties, body);
try {
listener.handleReturn(replyCode, replyText, exchange, routingKey, properties, body);
}
catch (Exception e) {
this.logger.error("Exception delivering returned message ", e);
}
}
}

Expand Down

0 comments on commit c582e1a

Please sign in to comment.