Skip to content

Commit

Permalink
GH-920: Propagate afterReceive to AsyncRTemplate
Browse files Browse the repository at this point in the history
Fixes #920

The `AsyncRabbitTemplate` is missing an after receive post-processing
in its internal containers for replies

* Reuse `afterPostProcessors` from the provided `RabbitTemplate` and
propagate them into the internal listener containers for replies
* Fix `AbstractCompressingPostProcessor` do not mutate provided
`MessageProperties` and build a fresh instance.
* Demonstrate post-processors propagation by the `GZipPostProcessor`
& `GUnzipPostProcessor` configuration in the `AsyncRabbitTemplateTests`

**Cherry-pick to 2.1.x**

* Use `JavaUtils` for nullable settings in the `AsyncRabbitTemplate`
* Add `copyProperties` option to the `AbstractCompressingPostProcessor`
with `false` by default for better performance
* Document a `copyProperties` in the `amqp.adoc`
  • Loading branch information
artembilan authored and garyrussell committed Mar 29, 2019
1 parent 9224d34 commit 38358d3
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.core.Ordered;
import org.springframework.util.FileCopyUtils;

Expand All @@ -38,16 +39,20 @@
* present.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 1.4.2
*/
public abstract class AbstractCompressingPostProcessor implements MessagePostProcessor, Ordered {

private final Log logger = LogFactory.getLog(this.getClass());
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final

private final boolean autoDecompress;

private int order;

private boolean copyProperties = false;

/**
* Construct a post processor that will include the
* {@link MessageProperties#SPRING_AUTO_DECOMPRESS} header set to 'true'.
Expand All @@ -68,23 +73,47 @@ public AbstractCompressingPostProcessor(boolean autoDecompress) {
this.autoDecompress = autoDecompress;
}

/**
* Flag to indicate if {@link MessageProperties} should be used as is or cloned for new message
* after compression.
* By default this flag is turned off for better performance since in most cases the original message
* is not used any more.
* @param copyProperties clone or reuse original message properties.
* @since 2.1.5
*/
public void setCopyProperties(boolean copyProperties) {
this.copyProperties = copyProperties;
}

@Override
public Message postProcessMessage(Message message) throws AmqpException {
ByteArrayOutputStream zipped = new ByteArrayOutputStream();
try {
ByteArrayOutputStream zipped = new ByteArrayOutputStream();
OutputStream zipper = getCompressorStream(zipped);
FileCopyUtils.copy(new ByteArrayInputStream(message.getBody()), zipper);
MessageProperties messageProperties = message.getMessageProperties();
String currentEncoding = messageProperties.getContentEncoding();
messageProperties
.setContentEncoding(getEncoding() + (currentEncoding == null ? "" : ":" + currentEncoding));
if (this.autoDecompress) {
messageProperties.setHeader(MessageProperties.SPRING_AUTO_DECOMPRESS, true);
}
byte[] compressed = zipped.toByteArray();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Compressed " + message.getBody().length + " to " + compressed.length);
}

MessageProperties originalProperties = message.getMessageProperties();

MessagePropertiesBuilder messagePropertiesBuilder =
this.copyProperties
? MessagePropertiesBuilder.fromClonedProperties(originalProperties)
: MessagePropertiesBuilder.fromProperties(originalProperties);

if (this.autoDecompress) {
messagePropertiesBuilder.setHeader(MessageProperties.SPRING_AUTO_DECOMPRESS, true);
}

MessageProperties messageProperties =
messagePropertiesBuilder.setContentEncoding(getEncoding() +
(originalProperties.getContentEncoding() == null
? ""
: ":" + originalProperties.getContentEncoding()))
.build();

return new Message(compressed, messageProperties);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.amqp.utils.JavaUtils;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.ParameterizedTypeReference;
Expand Down Expand Up @@ -157,6 +158,10 @@ public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange,
this.template.setExchange(exchange == null ? "" : exchange);
this.template.setRoutingKey(routingKey);
this.container = new SimpleMessageListenerContainer(connectionFactory);
JavaUtils.INSTANCE
.acceptIfNotNull(this.template.getAfterReceivePostProcessors(),
(value) -> this.container.setAfterReceivePostProcessors(
value.toArray(new MessagePostProcessor[0])));
this.container.setQueueNames(replyQueue);
this.container.setMessageListener(this);
this.container.afterPropertiesSet();
Expand Down Expand Up @@ -216,15 +221,10 @@ public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerConta
* @since 2.0
*/
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey) {
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
this(new RabbitTemplate(connectionFactory));
Assert.notNull(routingKey, "'routingKey' cannot be null");
this.template = new RabbitTemplate(connectionFactory);
this.template.setExchange(exchange == null ? "" : exchange);
this.template.setRoutingKey(routingKey);
this.container = null;
this.replyAddress = null;
this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
this.directReplyToContainer.setMessageListener(this);
}

/**
Expand All @@ -239,6 +239,10 @@ public AsyncRabbitTemplate(RabbitTemplate template) {
this.container = null;
this.replyAddress = null;
this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
JavaUtils.INSTANCE
.acceptIfNotNull(template.getAfterReceivePostProcessors(),
(value) -> this.directReplyToContainer.setAfterReceivePostProcessors(
value.toArray(new MessagePostProcessor[0])));
this.directReplyToContainer.setMessageListener(this);
}

Expand Down Expand Up @@ -582,9 +586,9 @@ public void onMessage(Message message, Channel channel) {
RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
Object converted = rabbitFuture.getReturnType() != null
&& messageConverter instanceof SmartMessageConverter
? ((SmartMessageConverter) messageConverter).fromMessage(message,
rabbitFuture.getReturnType())
: messageConverter.fromMessage(message);
? ((SmartMessageConverter) messageConverter).fromMessage(message,
rabbitFuture.getReturnType())
: messageConverter.fromMessage(message);
rabbitFuture.set(converted);
}
else {
Expand Down Expand Up @@ -698,7 +702,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}
AsyncRabbitTemplate.this.pending.remove(this.correlationId);
if (this.channelHolder != null && AsyncRabbitTemplate.this.directReplyToContainer != null) {
AsyncRabbitTemplate.this.directReplyToContainer.releaseConsumerFor(this.channelHolder, false, null); // NOSONAR
AsyncRabbitTemplate.this.directReplyToContainer
.releaseConsumerFor(this.channelHolder, false, null); // NOSONAR
}
return super.cancel(mayInterruptIfRunning);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected void initDefaultStrategies() {
*
* @param exchange the exchange name to use for send operations
*/
public void setExchange(String exchange) {
public void setExchange(@Nullable String exchange) {
this.exchange = (exchange != null) ? exchange : DEFAULT_EXCHANGE;
}

Expand Down Expand Up @@ -645,6 +645,18 @@ public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePo
this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors));
}

/**
* Return configured after receive {@link MessagePostProcessor}s or {@code null}.
* @return configured after receive {@link MessagePostProcessor}s or {@code null}.
* @since 2.1.5
*/
@Nullable
public Collection<MessagePostProcessor> getAfterReceivePostProcessors() {
return this.afterReceivePostProcessors != null
? Collections.unmodifiableCollection(this.afterReceivePostProcessors)
: null;
}

/**
* Add {@link MessagePostProcessor} that will be invoked immediately after a {@code Channel#basicGet()}
* and before any message conversion is performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -117,7 +119,8 @@ public void testConvert1ArgDirect() throws Exception {
this.latch.set(null);
waitForZeroInUseConsumers();
assertThat(TestUtils
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount", Integer.class),
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
Integer.class),
equalTo(2));
final String missingQueue = UUID.randomUUID().toString();
this.asyncDirectTemplate.convertSendAndReceive("", missingQueue, "foo"); // send to nowhere
Expand Down Expand Up @@ -174,12 +177,14 @@ public void testMessage1ArgDirect() throws Exception {
this.latch.set(null);
waitForZeroInUseConsumers();
assertThat(TestUtils
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount", Integer.class),
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
Integer.class),
equalTo(2));
this.asyncDirectTemplate.stop();
this.asyncDirectTemplate.start();
assertThat(TestUtils
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount", Integer.class),
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
Integer.class),
equalTo(0));
}

Expand Down Expand Up @@ -232,7 +237,8 @@ private Message getFooMessage() {
@DirtiesContext
public void testReturn() throws Exception {
this.asyncTemplate.setMandatory(true);
ListenableFuture<String> future = this.asyncTemplate.convertSendAndReceive(this.requests.getName() + "x", "foo");
ListenableFuture<String> future = this.asyncTemplate.convertSendAndReceive(this.requests.getName() + "x",
"foo");
try {
future.get(10, TimeUnit.SECONDS);
fail("Expected exception");
Expand Down Expand Up @@ -388,7 +394,7 @@ public void testStopCancelled() throws Exception {

private void checkConverterResult(ListenableFuture<String> future, String expected) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> resultRef = new AtomicReference<String>();
final AtomicReference<String> resultRef = new AtomicReference<>();
future.addCallback(new ListenableFutureCallback<String>() {

@Override
Expand All @@ -409,7 +415,7 @@ public void onFailure(Throwable ex) {

private Message checkMessageResult(ListenableFuture<Message> future, String expected) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> resultRef = new AtomicReference<Message>();
final AtomicReference<Message> resultRef = new AtomicReference<>();
future.addCallback(new ListenableFutureCallback<Message>() {

@Override
Expand Down Expand Up @@ -482,24 +488,36 @@ public RabbitAdmin admin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}

@Bean
public GZipPostProcessor gZipPostProcessor() {
GZipPostProcessor gZipPostProcessor = new GZipPostProcessor();
gZipPostProcessor.setCopyProperties(true);
return gZipPostProcessor;
}

@Bean
public RabbitTemplate template(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setRoutingKey(requests().getName());
rabbitTemplate.addBeforePublishPostProcessors(gZipPostProcessor());
rabbitTemplate.addAfterReceivePostProcessors(new GUnzipPostProcessor());
return rabbitTemplate;
}

@Bean
public RabbitTemplate templateForDirect(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setRoutingKey(requests().getName());
rabbitTemplate.addBeforePublishPostProcessors(gZipPostProcessor());
rabbitTemplate.addAfterReceivePostProcessors(new GUnzipPostProcessor());
return rabbitTemplate;
}

@Bean
@Primary
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAfterReceivePostProcessors(new GUnzipPostProcessor());
container.setQueueNames(replies().getName());
return container;
}
Expand All @@ -518,7 +536,8 @@ public AsyncRabbitTemplate asyncDirectTemplate(RabbitTemplate templateForDirect)
public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(requests().getName());
container.setMessageListener(
container.setAfterReceivePostProcessors(new GUnzipPostProcessor());
MessageListenerAdapter messageListener =
new MessageListenerAdapter((ReplyingMessageListener<String, String>)
message -> {
CountDownLatch countDownLatch = latch().get();
Expand All @@ -542,7 +561,10 @@ else if ("noReply".equals(message)) {
return null;
}
return message.toUpperCase();
}));
});

messageListener.setBeforeSendReplyPostProcessors(gZipPostProcessor());
container.setMessageListener(messageListener);
return container;
}

Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3435,6 +3435,11 @@ The second is invoked immediately after a message is received.
These extension points are used for such features as compression and, for this purpose, several `MessagePostProcessor` implementations are provided.
`GZipPostProcessor` and `ZipPostProcessor` compress messages before sending, and `GUnzipPostProcessor` and `UnzipPostProcessor` decompress received messages.

NOTE: Starting with version 2.1.5, the `GZipPostProcessor` can be configured with the `copyProperties = true` option to make a copy of the original message properties.
By default, these properties are reused for performance reasons, and modified with compression content encoding and the optional `MessageProperties.SPRING_AUTO_DECOMPRESS` header.
If you retain a reference to the original outbound message, its properties will change as well.
So, if your application retains a copy of an outbound message with these message post processors, consider turning the `copyProperties` option on.

Similarly, the `SimpleMessageListenerContainer` also has a `setAfterReceivePostProcessors()` method, letting the decompression be performed after messages are received by the container.

Starting with version 2.1.4, `addBeforePublishPostProcessors()` and `addAfterReceivePostProcessors()` have been added to the `RabbitTemplate` to allow appending new post processors to the list of before publish and after receive post processors respectively.
Expand Down

0 comments on commit 38358d3

Please sign in to comment.