From 08f54ce0ad72193eb1f635380b9a19345dcb3947 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 13 Dec 2018 12:28:30 -0500 Subject: [PATCH] AMQP-849: RT and DRTMLC - add ErrorHandler JIRA: https://jira.spring.io/browse/AMQP-849 Support the configuration of an error handler for exceptions when delivering replies (e.g. late replies). **cherry-pick to 2.0.x with adjustments to assertJ and docs** * Polishing - PR Comments * More polishing * Replace AssertJ with regular JUnit assertions # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateDirectReplyToContainerIntegrationTests.java # src/reference/asciidoc/amqp.adoc # src/reference/asciidoc/whats-new.adoc --- .../amqp/rabbit/core/RabbitTemplate.java | 84 +++++++++++-------- ...irectReplyToContainerIntegrationTests.java | 67 ++++++++++++++- src/reference/asciidoc/amqp.adoc | 4 + 3 files changed, 121 insertions(+), 34 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index 23d4266f73..6fbcaefd7f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -91,6 +91,7 @@ import org.springframework.retry.RetryCallback; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; +import org.springframework.util.ErrorHandler; import org.springframework.util.StringUtils; import com.rabbitmq.client.AMQP; @@ -185,60 +186,52 @@ public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, private final AtomicInteger containerInstance = new AtomicInteger(); - private volatile String exchange = DEFAULT_EXCHANGE; + private String exchange = DEFAULT_EXCHANGE; - private volatile String routingKey = DEFAULT_ROUTING_KEY; + private String routingKey = DEFAULT_ROUTING_KEY; // The default queue name that will be used for synchronous receives. private volatile String queue; - private volatile long receiveTimeout = 0; + private long receiveTimeout = 0; - private volatile long replyTimeout = DEFAULT_REPLY_TIMEOUT; + private long replyTimeout = DEFAULT_REPLY_TIMEOUT; - private volatile MessageConverter messageConverter = new SimpleMessageConverter(); + private MessageConverter messageConverter = new SimpleMessageConverter(); - private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter(); + private MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter(); - private volatile String encoding = DEFAULT_ENCODING; + private String encoding = DEFAULT_ENCODING; - private volatile String replyAddress; + private String replyAddress; - private volatile ConfirmCallback confirmCallback; + private ConfirmCallback confirmCallback; - private volatile ReturnCallback returnCallback; + private ReturnCallback returnCallback; - private volatile Boolean confirmsOrReturnsCapable; - - private volatile Expression mandatoryExpression = new ValueExpression(false); - - private volatile String correlationKey = null; + private Expression mandatoryExpression = new ValueExpression(false); - private volatile RetryTemplate retryTemplate; + private String correlationKey = null; - private volatile RecoveryCallback recoveryCallback; + private RetryTemplate retryTemplate; - private volatile Expression sendConnectionFactorySelectorExpression; - - private volatile Expression receiveConnectionFactorySelectorExpression; - - private volatile boolean usingFastReplyTo; + private RecoveryCallback recoveryCallback; - private volatile boolean useDirectReplyToContainer = true; + private Expression sendConnectionFactorySelectorExpression; - private volatile boolean evaluatedFastReplyTo; + private Expression receiveConnectionFactorySelectorExpression; - private volatile boolean useTemporaryReplyQueues; + private boolean useDirectReplyToContainer = true; - private volatile Collection beforePublishPostProcessors; + private boolean useTemporaryReplyQueues; - private volatile Collection afterReceivePostProcessors; + private Collection beforePublishPostProcessors; - private volatile CorrelationDataPostProcessor correlationDataPostProcessor; + private Collection afterReceivePostProcessors; - private volatile boolean isListener; + private CorrelationDataPostProcessor correlationDataPostProcessor; - private volatile Expression userIdExpression; + private Expression userIdExpression; private String beanName = "rabbitTemplate"; @@ -248,6 +241,18 @@ public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, private boolean usePublisherConnection; + private ErrorHandler replyErrorHandler; + + private volatile Boolean confirmsOrReturnsCapable; + + private volatile boolean publisherConfirms; + + private volatile boolean usingFastReplyTo; + + private volatile boolean evaluatedFastReplyTo; + + private volatile boolean isListener; + /** * Convenient constructor for use with setter injection. Don't forget to set the connection factory. */ @@ -694,9 +699,20 @@ public void setUsePublisherConnection(boolean usePublisherConnection) { this.usePublisherConnection = usePublisherConnection; } + /** + * When using a direct reply-to container for request/reply operations, set an error + * handler to be invoked when a reply delivery fails (e.g. due to a late reply). + * @param replyErrorHandler the reply error handler + * @since 2.0.11 + * @see #setUseDirectReplyToContainer(boolean) + */ + public void setReplyErrorHandler(ErrorHandler replyErrorHandler) { + this.replyErrorHandler = replyErrorHandler; + } + /** * Invoked by the container during startup so it can verify the queue is correctly - * configured (if a simple reply queue name is used instead of exchange/routingKey. + * configured (if a simple reply queue name is used instead of exchange/routingKey). * @return the queue name, if configured. * @since 1.5 */ @@ -1654,6 +1670,9 @@ private Message doSendAndReceiveWithDirect(String exchange, String routingKey, M container.setAfterReceivePostProcessors(this.afterReceivePostProcessors .toArray(new MessagePostProcessor[this.afterReceivePostProcessors.size()])); } + if (this.replyErrorHandler != null) { + container.setErrorHandler(this.replyErrorHandler); + } container.start(); this.directReplyToContainers.put(connectionFactory, container); this.replyAddress = Address.AMQ_RABBITMQ_REPLY_TO; @@ -2231,8 +2250,7 @@ public void onMessage(Message message) { .getHeaders().get(this.correlationKey); } if (messageTag == null) { - logger.error("No correlation header in reply"); - return; + throw new AmqpRejectAndDontRequeueException("No correlation header in reply"); } PendingReply pendingReply = this.replyHolder.get(messageTag); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateDirectReplyToContainerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateDirectReplyToContainerIntegrationTests.java index 33e5305c75..eb886ce2bd 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateDirectReplyToContainerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateDirectReplyToContainerIntegrationTests.java @@ -16,16 +16,32 @@ package org.springframework.amqp.rabbit.core; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; import org.springframework.amqp.utils.test.TestUtils; +import org.springframework.util.ErrorHandler; /** * @author Gary Russell @@ -46,10 +62,17 @@ protected RabbitTemplate createSendAndReceiveRabbitTemplate(ConnectionFactory co @SuppressWarnings("unchecked") @Test - public void channelReleasedOnTimeout() { + public void channelReleasedOnTimeout() throws Exception { final CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); RabbitTemplate template = createSendAndReceiveRabbitTemplate(connectionFactory); template.setReplyTimeout(1); + AtomicReference exception = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + ErrorHandler replyErrorHandler = t -> { + exception.set(t); + latch.countDown(); + }; + template.setReplyErrorHandler(replyErrorHandler); Object reply = template.convertSendAndReceive(ROUTE, "foo"); assertNull(reply); Object container = TestUtils.getPropertyValue(template, "directReplyToContainers", Map.class) @@ -57,6 +80,48 @@ public void channelReleasedOnTimeout() { ? connectionFactory.getPublisherConnectionFactory() : connectionFactory); assertEquals(0, TestUtils.getPropertyValue(container, "inUseConsumerChannels", Map.class).size()); + assertSame(replyErrorHandler, TestUtils.getPropertyValue(container, "errorHandler")); + Message replyMessage = new Message("foo".getBytes(), new MessageProperties()); + try { + template.onMessage(replyMessage); + } + catch (Exception e) { + assertThat(e, instanceOf(AmqpRejectAndDontRequeueException.class)); + assertEquals("No correlation header in reply", e.getMessage()); + } + + replyMessage.getMessageProperties().setCorrelationId("foo"); + + try { + template.onMessage(replyMessage); + } + catch (Exception e) { + assertThat(e, instanceOf(AmqpRejectAndDontRequeueException.class)); + assertEquals("Reply received after timeout", e.getMessage()); + } + + ExecutorService executor = Executors.newFixedThreadPool(1); + // Set up a consumer to respond to our producer + executor.submit(() -> { + Message message = template.receive(ROUTE, 10_000); + assertNotNull("No message received", message); + template.send(message.getMessageProperties().getReplyTo(), replyMessage); + return message; + }); + while (template.receive(ROUTE, 100) != null) { + // empty + } + reply = template.convertSendAndReceive(ROUTE, "foo"); + assertNull(reply); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertThat(exception.get(), instanceOf(ListenerExecutionFailedException.class)); + assertEquals("Reply received after timeout", exception.get().getCause().getMessage()); + assertArrayEquals(replyMessage.getBody(), + ((ListenerExecutionFailedException) exception.get()).getFailedMessage().getBody()); + assertEquals(0, TestUtils.getPropertyValue(container, "inUseConsumerChannels", Map.class).size()); + executor.shutdownNow(); + template.stop(); + connectionFactory.destroy(); } } diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index dedce59ad0..9db49396b0 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -2993,6 +2993,10 @@ NOTE: This feature uses publisher returns and is enabled by setting `publisherRe `CachingConnectionFactory` (see <>). Also, you must not have registered your own `ReturnCallback` with the `RabbitTemplate`. +Starting with versions 2.0.11, 2.1.3, when using the default `DirectReplyToMessageListenerContainer`, you can add an error handler by setting the template's `replyErrorHandler` property. +This error handler will be invoked for any failed deliveries, such as late replies and messages received without a correlation header. +The exception passed in is a `ListenerExecutionFailedException` which has a `failedMessage` property. + [[direct-reply-to]] ===== RabbitMQ Direct reply-to