diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java index 865b274c43..de67cf1ce3 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java @@ -49,6 +49,8 @@ */ public class RepublishMessageRecoverer implements MessageRecoverer { + private static final int ELIPSIS_LENGTH = 3; + public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace"; public static final String X_EXCEPTION_MESSAGE = "x-exception-message"; @@ -59,6 +61,8 @@ public class RepublishMessageRecoverer implements MessageRecoverer { public static final int DEFAULT_FRAME_MAX_HEADROOM = 20_000; + private static final int MAX_EXCEPTION_MESSAGE_SIZE_IN_TRACE = 100 - ELIPSIS_LENGTH; + protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR protected final AmqpTemplate errorTemplate; // NOSONAR @@ -150,9 +154,15 @@ protected MessageDeliveryMode getDeliveryMode() { public void recover(Message message, Throwable cause) { MessageProperties messageProperties = message.getMessageProperties(); Map headers = messageProperties.getHeaders(); - String stackTraceAsString = processStackTrace(cause); + String exceptionMessage = cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage(); + String[] processed = processStackTrace(cause, exceptionMessage); + String stackTraceAsString = processed[0]; + String truncatedExceptionMessage = processed[1]; + if (truncatedExceptionMessage != null) { + exceptionMessage = truncatedExceptionMessage; + } headers.put(X_EXCEPTION_STACKTRACE, stackTraceAsString); - headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage()); + headers.put(X_EXCEPTION_MESSAGE, exceptionMessage); headers.put(X_ORIGINAL_EXCHANGE, messageProperties.getReceivedExchange()); headers.put(X_ORIGINAL_ROUTING_KEY, messageProperties.getReceivedRoutingKey()); Map additionalHeaders = additionalHeaders(message, cause); @@ -183,7 +193,7 @@ public void recover(Message message, Throwable cause) { } } - private String processStackTrace(Throwable cause) { + private String[] processStackTrace(Throwable cause, String exceptionMessage) { String stackTraceAsString = getStackTraceAsString(cause); if (this.maxStackTraceLength < 0) { int maxStackTraceLen = RabbitUtils @@ -193,12 +203,39 @@ private String processStackTrace(Throwable cause) { this.maxStackTraceLength = maxStackTraceLen; } } - if (this.maxStackTraceLength > 0 && stackTraceAsString.length() > this.maxStackTraceLength) { - stackTraceAsString = stackTraceAsString.substring(0, this.maxStackTraceLength); - this.logger.warn("Stack trace in republished message header truncated due to frame_max limitations; " - + "consider increasing frame_max on the broker or reduce the stack trace depth", cause); + boolean truncated = false; + String truncatedExceptionMessage = exceptionMessage.length() <= MAX_EXCEPTION_MESSAGE_SIZE_IN_TRACE + ? exceptionMessage + : (exceptionMessage.substring(0, MAX_EXCEPTION_MESSAGE_SIZE_IN_TRACE) + "..."); + if (this.maxStackTraceLength > 0) { + if (stackTraceAsString.length() + exceptionMessage.length() > this.maxStackTraceLength) { + if (!exceptionMessage.equals(truncatedExceptionMessage)) { + int start = stackTraceAsString.indexOf(exceptionMessage); + stackTraceAsString = stackTraceAsString.substring(0, start) + + truncatedExceptionMessage + + stackTraceAsString.substring(start + exceptionMessage.length()); + } + int adjustedStackTraceLen = this.maxStackTraceLength - truncatedExceptionMessage.length(); + if (adjustedStackTraceLen > 0) { + if (stackTraceAsString.length() > adjustedStackTraceLen) { + stackTraceAsString = stackTraceAsString.substring(0, adjustedStackTraceLen); + this.logger.warn("Stack trace in republished message header truncated due to frame_max " + + "limitations; " + + "consider increasing frame_max on the broker or reduce the stack trace depth", cause); + truncated = true; + } + else if (stackTraceAsString.length() + exceptionMessage.length() > this.maxStackTraceLength) { + this.logger.warn("Exception message in republished message header truncated due to frame_max " + + "limitations; consider increasing frame_max on the broker or reduce the exception " + + "message size", cause); + truncatedExceptionMessage = exceptionMessage.substring(0, + this.maxStackTraceLength - stackTraceAsString.length() - ELIPSIS_LENGTH) + "..."; + truncated = true; + } + } + } } - return stackTraceAsString; + return new String[] { stackTraceAsString, truncated ? truncatedExceptionMessage : null }; } /** diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererIntegrationTests.java index fcce292fce..03ab000213 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererIntegrationTests.java @@ -30,11 +30,12 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.junit.RabbitAvailable; import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition; - -import com.rabbitmq.client.LongString; +import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; /** * @author Gary Russell + * @author Artem Bilan + * * @since 2.0.5 * */ @@ -43,30 +44,99 @@ public class RepublishMessageRecovererIntegrationTests { public static final String BIG_HEADER_QUEUE = "big.header.queue"; - private static final String BIG_EXCEPTION_MESSAGE = new String(new byte[10_000]).replaceAll("\u0000", "x"); + private static final String BIG_EXCEPTION_MESSAGE1 = new String(new byte[10_000]).replace("\u0000", "x"); + + private static final String BIG_EXCEPTION_MESSAGE2 = new String(new byte[10_000]).replace("\u0000", "y"); private int maxHeaderSize; @Test public void testBigHeader() { - RabbitTemplate template = new RabbitTemplate( - new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory())); - this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory()) - 20_000; + CachingConnectionFactory ccf = new CachingConnectionFactory( + RabbitAvailableCondition.getBrokerRunning().getConnectionFactory()); + RabbitTemplate template = new RabbitTemplate(ccf); + this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory()) + - RepublishMessageRecoverer.DEFAULT_FRAME_MAX_HEADROOM; + assertThat(this.maxHeaderSize).isGreaterThan(0); + RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE); + recoverer.recover(new Message("foo".getBytes(), new MessageProperties()), + new ListenerExecutionFailedException("Listener failed", + bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE1)), null)); + Message received = template.receive(BIG_HEADER_QUEUE, 10_000); + assertThat(received).isNotNull(); + String trace = + received.getMessageProperties() + .getHeaders() + .get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE).toString(); + assertThat(trace.length()).isEqualTo(this.maxHeaderSize - 100); + String truncatedMessage = + "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx..."; + assertThat(trace).contains(truncatedMessage); + assertThat((String) received.getMessageProperties().getHeaders() + .get(RepublishMessageRecoverer.X_EXCEPTION_MESSAGE)) + .isEqualTo(truncatedMessage); + ccf.destroy(); + } + + @Test + public void testSmallException() { + CachingConnectionFactory ccf = new CachingConnectionFactory( + RabbitAvailableCondition.getBrokerRunning().getConnectionFactory()); + RabbitTemplate template = new RabbitTemplate(ccf); + this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory()) + - RepublishMessageRecoverer.DEFAULT_FRAME_MAX_HEADROOM; + assertThat(this.maxHeaderSize).isGreaterThan(0); + RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE); + ListenerExecutionFailedException cause = new ListenerExecutionFailedException("Listener failed", + new RuntimeException(new String(new byte[200]).replace('\u0000', 'x')), null); + recoverer.recover(new Message("foo".getBytes(), new MessageProperties()), + cause); + Message received = template.receive(BIG_HEADER_QUEUE, 10_000); + assertThat(received).isNotNull(); + String trace = received.getMessageProperties().getHeaders() + .get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE).toString(); + assertThat(trace).isEqualTo(getStackTraceAsString(cause)); + ccf.destroy(); + } + + @Test + public void testBigMessageSmallTrace() { + CachingConnectionFactory ccf = new CachingConnectionFactory( + RabbitAvailableCondition.getBrokerRunning().getConnectionFactory()); + RabbitTemplate template = new RabbitTemplate(ccf); + this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory()) + - RepublishMessageRecoverer.DEFAULT_FRAME_MAX_HEADROOM; assertThat(this.maxHeaderSize).isGreaterThan(0); RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE); + ListenerExecutionFailedException cause = new ListenerExecutionFailedException("Listener failed", + new RuntimeException(new String(new byte[this.maxHeaderSize]).replace('\u0000', 'x'), + new IllegalStateException("foo")), null); recoverer.recover(new Message("foo".getBytes(), new MessageProperties()), - bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE))); + cause); Message received = template.receive(BIG_HEADER_QUEUE, 10_000); assertThat(received).isNotNull(); - assertThat(((LongString) received.getMessageProperties().getHeaders() - .get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE)).length()).isEqualTo(this.maxHeaderSize); + String trace = received.getMessageProperties().getHeaders() + .get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE).toString(); + assertThat(trace).contains("Caused by: java.lang.IllegalStateException"); + String exceptionMessage = + received.getMessageProperties() + .getHeaders() + .get(RepublishMessageRecoverer.X_EXCEPTION_MESSAGE).toString(); + assertThat(trace.length() + exceptionMessage.length()).isEqualTo(this.maxHeaderSize); + assertThat(exceptionMessage).endsWith("..."); + ccf.destroy(); } private Throwable bigCause(Throwable cause) { - if (getStackTraceAsString(cause).length() > this.maxHeaderSize) { + int length = getStackTraceAsString(cause).length(); + int wantThisSize = this.maxHeaderSize + RepublishMessageRecoverer.DEFAULT_FRAME_MAX_HEADROOM; + if (length > wantThisSize) { return cause; } - return bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE, cause)); + String msg = length + BIG_EXCEPTION_MESSAGE1.length() > wantThisSize + ? BIG_EXCEPTION_MESSAGE1 + : BIG_EXCEPTION_MESSAGE2; + return bigCause(new RuntimeException(msg, cause)); } private String getStackTraceAsString(Throwable cause) { diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index b81db542d5..7e8c860151 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -5644,6 +5644,16 @@ RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate ---- ==== +Starting with version 2.0.5, the stack trace may be truncated if it is too large; this is because all headers have to fit in a single frame. +By default, if the stack trace would cause less than 20,000 bytes ('headroom') to be available for other headers, it will be truncated. +This can be adjusted by setting the recoverer's `frameMaxHeadroom` property, if you need more or less space for other headers. +Starting with versions 2.1.13, 2.2.3, the exception message is included in this calculation, and the amount of stack trace will be maximized using the following algorithm: + +* if the stack trace alone would exceed the limit, the exception message header will be truncated to 97 bytes plus `...` and the stack trace is truncated too. +* if the stack trace is small, the message will be truncated (plus `...`) to fit in the available bytes (but the message within the stack trace itself is truncated to 97 bytes plus `...`). + +Whenever a truncation of any kind occurs, the original exception will be logged to retain the complete information. + Starting with version 2.1, an `ImmediateRequeueMessageRecoverer` is added to throw an `ImmediateRequeueAmqpException`, which notifies a listener container to requeue the current failed message. ===== Exception Classification for Spring Retry