Skip to content

Commit

Permalink
AMQP-790: Fix after receive MPPs with send/receive
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-790

Previously, `afterReceivePostProcessors` were not called on `sendAndReceive()`
operations.

# Conflicts:
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java
  • Loading branch information
garyrussell authored and artembilan committed Dec 12, 2017
1 parent 5b7eebd commit ca32f3f
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ public ConnectionFactory getConnectionFactory() {
this.connectionFactory.setPort(this.port);
this.connectionFactory.setUsername(this.user);
this.connectionFactory.setPassword(this.password);
this.connectionFactory.setAutomaticRecoveryEnabled(false);
}
return this.connectionFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ public void convertAndSend(String routingKey, Object message, MessagePostProcess
}

public void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
CorrelationData correlationData)
throws AmqpException {
convertAndSend(this.exchange, routingKey, message, messagePostProcessor, correlationData);
}
Expand All @@ -808,7 +808,7 @@ public void convertAndSend(String exchange, String routingKey, final Object mess
Message messageToSend = convertMessageIfNecessary(message);
messageToSend = messagePostProcessor instanceof CorrelationAwareMessagePostProcessor
? ((CorrelationAwareMessagePostProcessor) messagePostProcessor)
.postProcessMessage(messageToSend, correlationData)
.postProcessMessage(messageToSend, correlationData)
: messagePostProcessor.postProcessMessage(messageToSend);
send(exchange, routingKey, messageToSend, correlationData);
}
Expand Down Expand Up @@ -954,7 +954,7 @@ public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, fi

@Override
public <R, S> boolean receiveAndReply(final String queueName, ReceiveAndReplyCallback<R, S> callback, final String replyExchange,
final String replyRoutingKey) throws AmqpException {
final String replyRoutingKey) throws AmqpException {
return this.receiveAndReply(queueName, callback, new ReplyToAddressCallback<S>() {

@Override
Expand All @@ -973,13 +973,13 @@ public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, Re

@Override
public <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
return doReceiveAndReply(queueName, callback, replyToAddressCallback);
}

@SuppressWarnings("unchecked")
private <R, S> boolean doReceiveAndReply(final String queueName, final ReceiveAndReplyCallback<R, S> callback,
final ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
final ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
return this.execute(new ChannelCallback<Boolean>() {

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -1212,7 +1212,7 @@ protected Message convertSendAndReceiveRaw(final String exchange, final String r
if (messagePostProcessor != null) {
requestMessage = messagePostProcessor instanceof CorrelationAwareMessagePostProcessor
? ((CorrelationAwareMessagePostProcessor) messagePostProcessor)
.postProcessMessage(requestMessage, correlationData)
.postProcessMessage(requestMessage, correlationData)
: messagePostProcessor.postProcessMessage(requestMessage);
}
Message replyMessage = doSendAndReceive(exchange, routingKey, requestMessage, correlationData);
Expand Down Expand Up @@ -1279,13 +1279,18 @@ public Message doInRabbit(Channel channel) throws Exception {

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
byte[] body) throws IOException {
MessageProperties messageProperties = RabbitTemplate.this.messagePropertiesConverter
.toMessageProperties(properties, envelope, RabbitTemplate.this.encoding);
Message reply = new Message(body, messageProperties);
if (logger.isTraceEnabled()) {
logger.trace("Message received " + reply);
}
if (RabbitTemplate.this.afterReceivePostProcessors != null) {
for (MessagePostProcessor processor : RabbitTemplate.this.afterReceivePostProcessors) {
reply = processor.postProcessMessage(reply);
}
}
pendingReply.reply(reply);
}
};
Expand All @@ -1300,7 +1305,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
try {
channel.basicCancel(consumerTag);
}
catch (Exception e) { }
catch (Exception e) {
}
}
return reply;
}
Expand All @@ -1310,7 +1316,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message,
final CorrelationData correlationData) {
Assert.state(this.isListener, "RabbitTemplate is not configured as MessageListener - "
+ "cannot use a 'replyAddress': " + this.replyAddress);
+ "cannot use a 'replyAddress': " + this.replyAddress);
return this.execute(new ChannelCallback<Message>() {

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -1516,7 +1522,7 @@ protected void doSend(Channel channel, String exchange, String routingKey, Messa
for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
messageToUse = processor instanceof CorrelationAwareMessagePostProcessor
? ((CorrelationAwareMessagePostProcessor) processor)
.postProcessMessage(messageToUse, correlationData)
.postProcessMessage(messageToUse, correlationData)
: processor.postProcessMessage(messageToUse);
}
}
Expand Down Expand Up @@ -1564,6 +1570,7 @@ protected boolean isChannelLocallyTransacted(Channel channel) {
private Message buildMessageFromDelivery(com.rabbitmq.client.QueueingConsumer.Delivery delivery) {
return buildMessage(delivery.getEnvelope(), delivery.getProperties(), delivery.getBody(), -1);
}

private Message buildMessageFromResponse(GetResponse response) {
return buildMessage(response.getEnvelope(), response.getProps(), response.getBody(), response.getMessageCount());
}
Expand Down Expand Up @@ -1640,7 +1647,7 @@ private void addListener(Channel channel) {
else {
throw new IllegalStateException(
"Channel does not support confirms or returns; " +
"is the connection factory configured for confirms or returns?");
"is the connection factory configured for confirms or returns?");
}
}

Expand All @@ -1663,7 +1670,7 @@ public void handleReturn(int replyCode,
String routingKey,
BasicProperties properties,
byte[] body)
throws IOException {
throws IOException {

ReturnCallback returnCallback = this.returnCallback;
if (returnCallback == null) {
Expand Down Expand Up @@ -1766,7 +1773,7 @@ public void onMessage(Message message) {
else {
if (savedCorrelation != null) {
message.getMessageProperties().setHeader(this.correlationKey,
savedCorrelation);
savedCorrelation);
}
else {
message.getMessageProperties().getHeaders().remove(this.correlationKey);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.core;

import static org.junit.Assert.assertTrue;

import org.junit.AfterClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.junit4.SpringRunner;

/**
* @author Gary Russell
* @since 1.7.6
*
*/
@RunWith(SpringRunner.class)
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RabbitTemplateMPPIntegrationTests {

private static final String QUEUE = "mpp.tests";

private static final String REPLIES = "mpp.tests.replies";

@ClassRule
public static BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(QUEUE, REPLIES);

@Autowired
private RabbitTemplate template;

@Autowired
private Config config;

@AfterClass
public static void tearDown() {
brokerIsRunning.removeTestQueues();
}

@Test
public void testMPPsAppliedDirectReplyToContainerTests() {
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
assertTrue("before MPP not called", this.config.beforeMppCalled);
assertTrue("after MPP not called", this.config.afterMppCalled);
}

@Test
public void testMPPsAppliedDirectReplyToTests() {
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
assertTrue("before MPP not called", this.config.beforeMppCalled);
assertTrue("after MPP not called", this.config.afterMppCalled);
}

@Test
public void testMPPsAppliedTemporaryReplyQueueTests() {
this.template.setUseTemporaryReplyQueues(true);
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
assertTrue("before MPP not called", this.config.beforeMppCalled);
assertTrue("after MPP not called", this.config.afterMppCalled);
}

@Test
public void testMPPsAppliedReplyContainerTests() {
this.template.setReplyAddress(REPLIES);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.config.cf());
try {
container.setQueueNames(REPLIES);
container.setMessageListener(this.template);
container.setAfterReceivePostProcessors(this.config.afterMPP());
container.afterPropertiesSet();
container.start();
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
assertTrue("before MPP not called", this.config.beforeMppCalled);
assertTrue("after MPP not called", this.config.afterMppCalled);
}
finally {
container.stop();
}
}

@Configuration
@EnableRabbit
public static class Config {

private boolean beforeMppCalled;

private boolean afterMppCalled;

@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory(brokerIsRunning.getConnectionFactory());
}

@Bean
public RabbitTemplate template() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf());
rabbitTemplate.setRoutingKey(QUEUE);
rabbitTemplate.setBeforePublishPostProcessors(m -> {
this.beforeMppCalled = true;
return m;
});
rabbitTemplate.setAfterReceivePostProcessors(afterMPP());
return rabbitTemplate;
}

@Bean
public MessagePostProcessor afterMPP() {
return m -> {
this.afterMppCalled = true;
return m;
};
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
cf.setConnectionFactory(cf());
return cf;
}

@RabbitListener(queues = QUEUE)
public byte[] foo(byte[] in) {
return in;
}

}

}

0 comments on commit ca32f3f

Please sign in to comment.