Skip to content

Commit

Permalink
AMQP-790: Fix after receive MPPs with send/receive
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-790

Previously, `afterReceivePostProcessors` were not called on `sendAndReceive()`
operations.
  • Loading branch information
garyrussell authored and artembilan committed Dec 12, 2017
1 parent dc0bbc4 commit c576b27
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ public ConnectionFactory getConnectionFactory() {
this.connectionFactory.setPort(this.port);
this.connectionFactory.setUsername(this.user);
this.connectionFactory.setPassword(this.password);
this.connectionFactory.setAutomaticRecoveryEnabled(false);
}
return this.connectionFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
if (logger.isTraceEnabled()) {
logger.trace("Message received " + reply);
}
if (RabbitTemplate.this.afterReceivePostProcessors != null) {
for (MessagePostProcessor processor : RabbitTemplate.this.afterReceivePostProcessors) {
reply = processor.postProcessMessage(reply);
}
}
pendingReply.reply(reply);
}

Expand Down Expand Up @@ -1611,6 +1616,10 @@ private Message doSendAndReceiveWithDirect(String exchange, String routingKey, M
if (this.taskExecutor != null) {
container.setTaskExecutor(this.taskExecutor);
}
if (this.afterReceivePostProcessors != null) {
container.setAfterReceivePostProcessors(this.afterReceivePostProcessors
.toArray(new MessagePostProcessor[this.afterReceivePostProcessors.size()]));
}
container.start();
this.directReplyToContainers.put(connectionFactory, container);
this.replyAddress = Address.AMQ_RABBITMQ_REPLY_TO;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.core;

import static org.junit.Assert.assertTrue;

import org.junit.AfterClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.junit4.SpringRunner;

/**
* @author Gary Russell
* @since 1.7.6
*
*/
@RunWith(SpringRunner.class)
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RabbitTemplateMPPIntegrationTests {

private static final String QUEUE = "mpp.tests";

private static final String REPLIES = "mpp.tests.replies";

@ClassRule
public static BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(QUEUE, REPLIES);

@Autowired
private RabbitTemplate template;

@Autowired
private Config config;

@AfterClass
public static void tearDown() {
brokerIsRunning.removeTestQueues();
}

@Test // 2.0.x only
public void testMPPsAppliedDirectReplyToContainerTests() {
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
assertTrue("before MPP not called", this.config.beforeMppCalled);
assertTrue("after MPP not called", this.config.afterMppCalled);
}

@Test
public void testMPPsAppliedDirectReplyToTests() {
this.template.setUseDirectReplyToContainer(false);
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
assertTrue("before MPP not called", this.config.beforeMppCalled);
assertTrue("after MPP not called", this.config.afterMppCalled);
}

@Test
public void testMPPsAppliedTemporaryReplyQueueTests() {
this.template.setUseDirectReplyToContainer(false);
this.template.setUseTemporaryReplyQueues(true);
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
assertTrue("before MPP not called", this.config.beforeMppCalled);
assertTrue("after MPP not called", this.config.afterMppCalled);
}

@Test
public void testMPPsAppliedReplyContainerTests() {
this.template.setReplyAddress(REPLIES);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.config.cf());
try {
container.setQueueNames(REPLIES);
container.setMessageListener(this.template);
container.setAfterReceivePostProcessors(this.config.afterMPP());
container.afterPropertiesSet();
container.start();
this.template.sendAndReceive(new Message("foo".getBytes(), new MessageProperties()));
assertTrue("before MPP not called", this.config.beforeMppCalled);
assertTrue("after MPP not called", this.config.afterMppCalled);
}
finally {
container.stop();
}
}

@Configuration
@EnableRabbit
public static class Config {

private boolean beforeMppCalled;

private boolean afterMppCalled;

@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory(brokerIsRunning.getConnectionFactory());
}

@Bean
public RabbitTemplate template() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf());
rabbitTemplate.setRoutingKey(QUEUE);
rabbitTemplate.setBeforePublishPostProcessors(m -> {
this.beforeMppCalled = true;
return m;
});
rabbitTemplate.setAfterReceivePostProcessors(afterMPP());
return rabbitTemplate;
}

@Bean
public MessagePostProcessor afterMPP() {
return m -> {
this.afterMppCalled = true;
return m;
};
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
cf.setConnectionFactory(cf());
return cf;
}

@RabbitListener(queues = QUEUE)
public byte[] foo(byte[] in) {
return in;
}

}

}

0 comments on commit c576b27

Please sign in to comment.