Skip to content

Commit

Permalink
AMQP-813: Requeue after retry exhausted option
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-813

* Add `ImmediateRequeueAmqpException` to let `ContainerUtils.shouldRequeue()`
to return `true` immediately and requeue the message in the container
* Add `ImmediateRequeueMessageRecoverer` to throw a mentioned above
`ImmediateRequeueAmqpException`
* Refactor `RetryInterceptorBuilder` to avoid duplicated code
* Mention changes in the Docs

* Mention `ImmediateRequeueAmqpException` in Docs alongside
with the `defaultRequeueRejected`

Doc Polishing
  • Loading branch information
artembilan authored and garyrussell committed Sep 21, 2018
1 parent 88f57fa commit d75777d
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 90 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp;

/**
* The special {@link AmqpException} to be thrown from the listener (e.g. retry recoverer callback)
* to {@code requeue) failed message.
*
* @author Artem Bilan
*
* @since 2.1
*/
@SuppressWarnings("serial")
public class ImmediateRequeueAmqpException extends AmqpException {

public ImmediateRequeueAmqpException(String message) {
super(message);
}

public ImmediateRequeueAmqpException(Throwable cause) {
super(cause);
}

public ImmediateRequeueAmqpException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,10 +63,12 @@
*
* @author James Carr
* @author Gary Russell
* @author Artem Bilan
*
* @since 1.3
*
*/
public abstract class RetryInterceptorBuilder<T extends MethodInterceptor> {
public abstract class RetryInterceptorBuilder<B extends RetryInterceptorBuilder<B, T>, T extends MethodInterceptor> {

private RetryOperations retryOperations;

Expand Down Expand Up @@ -100,16 +102,21 @@ public static StatelessRetryInterceptorBuilder stateless() {
return new StatelessRetryInterceptorBuilder();
}

@SuppressWarnings("unchecked")
protected final B _this() {
return (B) this;
}

/**
* Apply the retry operations - once this is set, other properties can no longer be set; can't
* be set if other properties have been applied.
* @param retryOperations The retry operations.
* @return this.
*/
public RetryInterceptorBuilder<T> retryOperations(RetryOperations retryOperations) {
public B retryOperations(RetryOperations retryOperations) {
Assert.isTrue(!this.templateAltered, "Cannot set retryOperations when the default has been modified");
this.retryOperations = retryOperations;
return this;
return _this();
}

/**
Expand All @@ -118,13 +125,13 @@ public RetryInterceptorBuilder<T> retryOperations(RetryOperations retryOperation
* @param maxAttempts the max attempts.
* @return this.
*/
public RetryInterceptorBuilder<T> maxAttempts(int maxAttempts) {
public B maxAttempts(int maxAttempts) {
Assert.isNull(this.retryOperations, "cannot alter the retry policy when a custom retryOperations has been set");
Assert.isTrue(!this.retryPolicySet, "cannot alter the retry policy when a custom retryPolicy has been set");
this.simpleRetryPolicy.setMaxAttempts(maxAttempts);
this.retryTemplate.setRetryPolicy(this.simpleRetryPolicy);
this.templateAltered = true;
return this;
return _this();
}

/**
Expand All @@ -134,7 +141,7 @@ public RetryInterceptorBuilder<T> maxAttempts(int maxAttempts) {
* @param maxInterval The max interval.
* @return this.
*/
public RetryInterceptorBuilder<T> backOffOptions(long initialInterval, double multiplier, long maxInterval) {
public B backOffOptions(long initialInterval, double multiplier, long maxInterval) {
Assert.isNull(this.retryOperations, "cannot set the back off policy when a custom retryOperations has been set");
Assert.isTrue(!this.backOffPolicySet, "cannot set the back off options when a back off policy has been set");
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
Expand All @@ -144,7 +151,7 @@ public RetryInterceptorBuilder<T> backOffOptions(long initialInterval, double mu
this.retryTemplate.setBackOffPolicy(policy);
this.backOffOptionsSet = true;
this.templateAltered = true;
return this;
return _this();
}

/**
Expand All @@ -153,37 +160,37 @@ public RetryInterceptorBuilder<T> backOffOptions(long initialInterval, double mu
* @param policy The policy.
* @return this.
*/
public RetryInterceptorBuilder<T> retryPolicy(RetryPolicy policy) {
public B retryPolicy(RetryPolicy policy) {
Assert.isNull(this.retryOperations, "cannot set the retry policy when a custom retryOperations has been set");
Assert.isTrue(!this.templateAltered, "cannot set the retry policy if max attempts or back off policy or options changed");
this.retryTemplate.setRetryPolicy(policy);
this.retryPolicySet = true;
this.templateAltered = true;
return this;
return _this();
}

/**
* Apply the back off policy. Cannot be used if a custom retry operations, or back off policy has been applied.
* @param policy The policy.
* @return this.
*/
public RetryInterceptorBuilder<T> backOffPolicy(BackOffPolicy policy) {
public B backOffPolicy(BackOffPolicy policy) {
Assert.isNull(this.retryOperations, "cannot set the back off policy when a custom retryOperations has been set");
Assert.isTrue(!this.backOffOptionsSet, "cannot set the back off policy when the back off policy options have been set");
this.retryTemplate.setBackOffPolicy(policy);
this.templateAltered = true;
this.backOffPolicySet = true;
return this;
return _this();
}

/**
* Apply a Message recoverer - default is to log and discard after retry is exhausted.
* @param recoverer The recoverer.
* @return this.
*/
public RetryInterceptorBuilder<T> recoverer(MessageRecoverer recoverer) {
public B recoverer(MessageRecoverer recoverer) {
this.messageRecoverer = recoverer;
return this;
return _this();
}

protected void applyCommonSettings(AbstractRetryOperationsInterceptorFactoryBean factoryBean) {
Expand All @@ -204,7 +211,8 @@ protected void applyCommonSettings(AbstractRetryOperationsInterceptorFactoryBean
/**
* Builder for a stateful interceptor.
*/
public static final class StatefulRetryInterceptorBuilder extends RetryInterceptorBuilder<StatefulRetryOperationsInterceptor> {
public static final class StatefulRetryInterceptorBuilder
extends RetryInterceptorBuilder<StatefulRetryInterceptorBuilder, StatefulRetryOperationsInterceptor> {

private final StatefulRetryOperationsInterceptorFactoryBean factoryBean =
new StatefulRetryOperationsInterceptorFactoryBean();
Expand Down Expand Up @@ -238,44 +246,6 @@ public StatefulRetryInterceptorBuilder newMessageIdentifier(NewMessageIdentifier
return this;
}

@Override
public StatefulRetryInterceptorBuilder retryOperations(
RetryOperations retryOperations) {
super.retryOperations(retryOperations);
return this;
}

@Override
public StatefulRetryInterceptorBuilder maxAttempts(int maxAttempts) {
super.maxAttempts(maxAttempts);
return this;
}

@Override
public StatefulRetryInterceptorBuilder backOffOptions(long initialInterval,
double multiplier, long maxInterval) {
super.backOffOptions(initialInterval, multiplier, maxInterval);
return this;
}

@Override
public StatefulRetryInterceptorBuilder retryPolicy(RetryPolicy policy) {
super.retryPolicy(policy);
return this;
}

@Override
public StatefulRetryInterceptorBuilder backOffPolicy(BackOffPolicy policy) {
super.backOffPolicy(policy);
return this;
}

@Override
public StatefulRetryInterceptorBuilder recoverer(MessageRecoverer recoverer) {
super.recoverer(recoverer);
return this;
}

@Override
public StatefulRetryOperationsInterceptor build() {
this.applyCommonSettings(this.factoryBean);
Expand All @@ -295,7 +265,7 @@ public StatefulRetryOperationsInterceptor build() {
* Builder for a stateless interceptor.
*/
public static final class StatelessRetryInterceptorBuilder
extends RetryInterceptorBuilder<RetryOperationsInterceptor> {
extends RetryInterceptorBuilder<StatelessRetryInterceptorBuilder, RetryOperationsInterceptor> {

private final StatelessRetryOperationsInterceptorFactoryBean factoryBean =
new StatelessRetryOperationsInterceptorFactoryBean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.apache.commons.logging.Log;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateRequeueAmqpException;

/**
* Utility methods for listener containers.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1
*
Expand All @@ -45,7 +47,8 @@ private ContainerUtils() {
*/
public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
boolean shouldRequeue = defaultRequeueRejected ||
throwable instanceof MessageRejectedWhileStoppingException;
throwable instanceof MessageRejectedWhileStoppingException ||
throwable instanceof ImmediateRequeueAmqpException;
Throwable t = throwable;
while (shouldRequeue && t != null) {
if (t instanceof AmqpRejectAndDontRequeueException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.retry;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.ImmediateRequeueAmqpException;
import org.springframework.amqp.core.Message;

/**
* The {@link MessageRecoverer} implementation to throw an {@link ImmediateRequeueAmqpException}
* for subsequent requeuing in the listener container.
*
* @author Artem Bilan
*
* @since 2.1
*/
public class ImmediateRequeueMessageRecoverer implements MessageRecoverer {

protected Log logger = LogFactory.getLog(ImmediateRequeueMessageRecoverer.class);

@Override
public void recover(Message message, Throwable cause) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Retries exhausted for message " + message + "; requeuing...", cause);
}
throw new ImmediateRequeueAmqpException(cause);
}

}
Loading

0 comments on commit d75777d

Please sign in to comment.