Skip to content

Commit

Permalink
GH-1130: Repub Recoverer include ex. message size
Browse files Browse the repository at this point in the history
Resolves #1130

`RepublishingMessageRecoverer` - include the exception message length in the
truncation algorithm.

Note that the message is included in the stack trace.

If the stack trace + exception message would exceed the limit
- first truncate the message within the stack trace to 100 bytes
-- if the stack trace and original message still exceed the limit
-- also truncate the `X_EXCEPTION_MESSAGE` header to 100 bytes and
   use the remaing space for stack trace

If, after truncating the message in the stack trace, there is room remaining
the full stack trace as well as the truncated message, re-truncate the
`X_EXCEPTION_MESSAGE` header to use the remaining available bytes.

examples:
  message 150 bytes, stack trace 350 bytes, available 300 bytes,
  stack trace after message truncation 300 bytes
   - truncate message to 100, trace to 200

  message 200 bytes, stack trace 250 bytes, available 300 bytes,
  stack trace after message truncation 150 bytes
   - truncate message to 100 bytes, trace to 200

  message 200 bytes, stack trace 250 bytes, available 300 bytes,
  stack trace after message truncation 150 bytes
   - truncate message to 150 bytes, trace remains at 150

These are for illustration only, the available bytes is generally must larger.

**cherry-pick to 2.1.x**

* Fix conflicts in the `RepublishMessageRecovererIntegrationTests` for
current code base around `ListenerExecutionFailedException` and `MessageProperties`
  • Loading branch information
garyrussell authored and artembilan committed Dec 6, 2019
1 parent ce677bf commit 5cadcc3
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
*/
public class RepublishMessageRecoverer implements MessageRecoverer {

private static final int ELIPSIS_LENGTH = 3;

public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";

public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
Expand All @@ -59,6 +61,8 @@ public class RepublishMessageRecoverer implements MessageRecoverer {

public static final int DEFAULT_FRAME_MAX_HEADROOM = 20_000;

private static final int MAX_EXCEPTION_MESSAGE_SIZE_IN_TRACE = 100 - ELIPSIS_LENGTH;

protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR

protected final AmqpTemplate errorTemplate; // NOSONAR
Expand Down Expand Up @@ -150,9 +154,15 @@ protected MessageDeliveryMode getDeliveryMode() {
public void recover(Message message, Throwable cause) {
MessageProperties messageProperties = message.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
String stackTraceAsString = processStackTrace(cause);
String exceptionMessage = cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage();
String[] processed = processStackTrace(cause, exceptionMessage);
String stackTraceAsString = processed[0];
String truncatedExceptionMessage = processed[1];
if (truncatedExceptionMessage != null) {
exceptionMessage = truncatedExceptionMessage;
}
headers.put(X_EXCEPTION_STACKTRACE, stackTraceAsString);
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(X_EXCEPTION_MESSAGE, exceptionMessage);
headers.put(X_ORIGINAL_EXCHANGE, messageProperties.getReceivedExchange());
headers.put(X_ORIGINAL_ROUTING_KEY, messageProperties.getReceivedRoutingKey());
Map<? extends String, ? extends Object> additionalHeaders = additionalHeaders(message, cause);
Expand Down Expand Up @@ -183,7 +193,7 @@ public void recover(Message message, Throwable cause) {
}
}

private String processStackTrace(Throwable cause) {
private String[] processStackTrace(Throwable cause, String exceptionMessage) {
String stackTraceAsString = getStackTraceAsString(cause);
if (this.maxStackTraceLength < 0) {
int maxStackTraceLen = RabbitUtils
Expand All @@ -193,12 +203,39 @@ private String processStackTrace(Throwable cause) {
this.maxStackTraceLength = maxStackTraceLen;
}
}
if (this.maxStackTraceLength > 0 && stackTraceAsString.length() > this.maxStackTraceLength) {
stackTraceAsString = stackTraceAsString.substring(0, this.maxStackTraceLength);
this.logger.warn("Stack trace in republished message header truncated due to frame_max limitations; "
+ "consider increasing frame_max on the broker or reduce the stack trace depth", cause);
boolean truncated = false;
String truncatedExceptionMessage = exceptionMessage.length() <= MAX_EXCEPTION_MESSAGE_SIZE_IN_TRACE
? exceptionMessage
: (exceptionMessage.substring(0, MAX_EXCEPTION_MESSAGE_SIZE_IN_TRACE) + "...");
if (this.maxStackTraceLength > 0) {
if (stackTraceAsString.length() + exceptionMessage.length() > this.maxStackTraceLength) {
if (!exceptionMessage.equals(truncatedExceptionMessage)) {
int start = stackTraceAsString.indexOf(exceptionMessage);
stackTraceAsString = stackTraceAsString.substring(0, start)
+ truncatedExceptionMessage
+ stackTraceAsString.substring(start + exceptionMessage.length());
}
int adjustedStackTraceLen = this.maxStackTraceLength - truncatedExceptionMessage.length();
if (adjustedStackTraceLen > 0) {
if (stackTraceAsString.length() > adjustedStackTraceLen) {
stackTraceAsString = stackTraceAsString.substring(0, adjustedStackTraceLen);
this.logger.warn("Stack trace in republished message header truncated due to frame_max "
+ "limitations; "
+ "consider increasing frame_max on the broker or reduce the stack trace depth", cause);
truncated = true;
}
else if (stackTraceAsString.length() + exceptionMessage.length() > this.maxStackTraceLength) {
this.logger.warn("Exception message in republished message header truncated due to frame_max "
+ "limitations; consider increasing frame_max on the broker or reduce the exception "
+ "message size", cause);
truncatedExceptionMessage = exceptionMessage.substring(0,
this.maxStackTraceLength - stackTraceAsString.length() - ELIPSIS_LENGTH) + "...";
truncated = true;
}
}
}
}
return stackTraceAsString;
return new String[] { stackTraceAsString, truncated ? truncatedExceptionMessage : null };
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;

import com.rabbitmq.client.LongString;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;

/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.0.5
*
*/
Expand All @@ -43,30 +44,99 @@ public class RepublishMessageRecovererIntegrationTests {

public static final String BIG_HEADER_QUEUE = "big.header.queue";

private static final String BIG_EXCEPTION_MESSAGE = new String(new byte[10_000]).replaceAll("\u0000", "x");
private static final String BIG_EXCEPTION_MESSAGE1 = new String(new byte[10_000]).replace("\u0000", "x");

private static final String BIG_EXCEPTION_MESSAGE2 = new String(new byte[10_000]).replace("\u0000", "y");

private int maxHeaderSize;

@Test
public void testBigHeader() {
RabbitTemplate template = new RabbitTemplate(
new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory()));
this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory()) - 20_000;
CachingConnectionFactory ccf = new CachingConnectionFactory(
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
RabbitTemplate template = new RabbitTemplate(ccf);
this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory())
- RepublishMessageRecoverer.DEFAULT_FRAME_MAX_HEADROOM;
assertThat(this.maxHeaderSize).isGreaterThan(0);
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE);
recoverer.recover(new Message("foo".getBytes(), new MessageProperties()),
new ListenerExecutionFailedException("Listener failed",
bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE1)), null));
Message received = template.receive(BIG_HEADER_QUEUE, 10_000);
assertThat(received).isNotNull();
String trace =
received.getMessageProperties()
.getHeaders()
.get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE).toString();
assertThat(trace.length()).isEqualTo(this.maxHeaderSize - 100);
String truncatedMessage =
"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx...";
assertThat(trace).contains(truncatedMessage);
assertThat((String) received.getMessageProperties().getHeaders()
.get(RepublishMessageRecoverer.X_EXCEPTION_MESSAGE))
.isEqualTo(truncatedMessage);
ccf.destroy();
}

@Test
public void testSmallException() {
CachingConnectionFactory ccf = new CachingConnectionFactory(
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
RabbitTemplate template = new RabbitTemplate(ccf);
this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory())
- RepublishMessageRecoverer.DEFAULT_FRAME_MAX_HEADROOM;
assertThat(this.maxHeaderSize).isGreaterThan(0);
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE);
ListenerExecutionFailedException cause = new ListenerExecutionFailedException("Listener failed",
new RuntimeException(new String(new byte[200]).replace('\u0000', 'x')), null);
recoverer.recover(new Message("foo".getBytes(), new MessageProperties()),
cause);
Message received = template.receive(BIG_HEADER_QUEUE, 10_000);
assertThat(received).isNotNull();
String trace = received.getMessageProperties().getHeaders()
.get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE).toString();
assertThat(trace).isEqualTo(getStackTraceAsString(cause));
ccf.destroy();
}

@Test
public void testBigMessageSmallTrace() {
CachingConnectionFactory ccf = new CachingConnectionFactory(
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
RabbitTemplate template = new RabbitTemplate(ccf);
this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory())
- RepublishMessageRecoverer.DEFAULT_FRAME_MAX_HEADROOM;
assertThat(this.maxHeaderSize).isGreaterThan(0);
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE);
ListenerExecutionFailedException cause = new ListenerExecutionFailedException("Listener failed",
new RuntimeException(new String(new byte[this.maxHeaderSize]).replace('\u0000', 'x'),
new IllegalStateException("foo")), null);
recoverer.recover(new Message("foo".getBytes(), new MessageProperties()),
bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE)));
cause);
Message received = template.receive(BIG_HEADER_QUEUE, 10_000);
assertThat(received).isNotNull();
assertThat(((LongString) received.getMessageProperties().getHeaders()
.get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE)).length()).isEqualTo(this.maxHeaderSize);
String trace = received.getMessageProperties().getHeaders()
.get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE).toString();
assertThat(trace).contains("Caused by: java.lang.IllegalStateException");
String exceptionMessage =
received.getMessageProperties()
.getHeaders()
.get(RepublishMessageRecoverer.X_EXCEPTION_MESSAGE).toString();
assertThat(trace.length() + exceptionMessage.length()).isEqualTo(this.maxHeaderSize);
assertThat(exceptionMessage).endsWith("...");
ccf.destroy();
}

private Throwable bigCause(Throwable cause) {
if (getStackTraceAsString(cause).length() > this.maxHeaderSize) {
int length = getStackTraceAsString(cause).length();
int wantThisSize = this.maxHeaderSize + RepublishMessageRecoverer.DEFAULT_FRAME_MAX_HEADROOM;
if (length > wantThisSize) {
return cause;
}
return bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE, cause));
String msg = length + BIG_EXCEPTION_MESSAGE1.length() > wantThisSize
? BIG_EXCEPTION_MESSAGE1
: BIG_EXCEPTION_MESSAGE2;
return bigCause(new RuntimeException(msg, cause));
}

private String getStackTraceAsString(Throwable cause) {
Expand Down
10 changes: 10 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5644,6 +5644,16 @@ RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate
----
====

Starting with version 2.0.5, the stack trace may be truncated if it is too large; this is because all headers have to fit in a single frame.
By default, if the stack trace would cause less than 20,000 bytes ('headroom') to be available for other headers, it will be truncated.
This can be adjusted by setting the recoverer's `frameMaxHeadroom` property, if you need more or less space for other headers.
Starting with versions 2.1.13, 2.2.3, the exception message is included in this calculation, and the amount of stack trace will be maximized using the following algorithm:

* if the stack trace alone would exceed the limit, the exception message header will be truncated to 97 bytes plus `...` and the stack trace is truncated too.
* if the stack trace is small, the message will be truncated (plus `...`) to fit in the available bytes (but the message within the stack trace itself is truncated to 97 bytes plus `...`).

Whenever a truncation of any kind occurs, the original exception will be logged to retain the complete information.

Starting with version 2.1, an `ImmediateRequeueMessageRecoverer` is added to throw an `ImmediateRequeueAmqpException`, which notifies a listener container to requeue the current failed message.

===== Exception Classification for Spring Retry
Expand Down

0 comments on commit 5cadcc3

Please sign in to comment.