Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageListenerAdapter improve #990

Closed
wants to merge 19 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ else if (delegateListener instanceof MessageListener) {
}

// Invoke the handler method with appropriate arguments.
Object[] listenerArguments = buildListenerArguments(convertedMessage);
Object[] listenerArguments = buildListenerArguments(convertedMessage, channel, message);
Object result = invokeListenerMethod(methodName, listenerArguments, message);
if (result != null) {
handleResult(new InvocationResult(result, null, null), message, channel);
Expand Down Expand Up @@ -344,6 +344,25 @@ protected Object[] buildListenerArguments(Object extractedMessage) {
return new Object[] {extractedMessage};
}

/**
* Build an array of arguments to be passed into the target listener method. Allows for multiple method arguments to
* be built from message object with channel, More detail about extractedMessage in the method
* #buildListenerArguments(java.lang.Object)
* <p>
* This can be overridden to treat special message content such as arrays differently, and add argument in case of
* receiving Channel and original Message object to invoke basicAck method in the listener by manual acknowledge
* mode
*
* @param extractedMessage the content of the message
* @param channel the Rabbit channel to operate on
* @param message the incoming Rabbit message
* @return the array of arguments to be passed into the listener method (each element of the array corresponding to
* a distinct method argument)
*/
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return buildListenerArguments(extractedMessage);
kc910521 marked this conversation as resolved.
Show resolved Hide resolved
kc910521 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Invoke the specified listener method.
* @param methodName the name of the listener method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,6 +31,7 @@
import org.junit.Before;
import org.junit.Test;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
Expand All @@ -54,6 +56,8 @@ public class MessageListenerAdapterTests {

private MessageListenerAdapter adapter;

private MessageListenerAdapter extendedAdapter;
kc910521 marked this conversation as resolved.
Show resolved Hide resolved

private final SimpleService simpleService = new SimpleService();

@Before
Expand All @@ -62,6 +66,28 @@ public void init() {
this.messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
this.adapter = new MessageListenerAdapter();
this.adapter.setMessageConverter(new SimpleMessageConverter());
this.extendedAdapter = new ExtendedListenerAdapter();
this.extendedAdapter.setMessageConverter(new SimpleMessageConverter());
}

@Test
public void testExtendedListenerAdapter() throws Exception {
final AtomicBoolean called = new AtomicBoolean(false);
Channel channel = mock(Channel.class);
class Delegate {
@SuppressWarnings("unused")
public void handleMessage(String input, Channel channel, Message message) throws IOException {
assertThat(input).isNotNull();
assertThat(channel).isNotNull();
assertThat(message).isNotNull();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
called.set(true);
}
}
this.extendedAdapter.setDelegate(new Delegate());
this.extendedAdapter.containerAckMode(AcknowledgeMode.MANUAL);
this.extendedAdapter.onMessage(new Message("foo".getBytes(), messageProperties), channel);
assertThat(called.get()).isTrue();
}

@Test
Expand Down Expand Up @@ -205,4 +231,12 @@ public String notDefinedOnInterface(String input) {
}

}

private class ExtendedListenerAdapter extends MessageListenerAdapter {
kc910521 marked this conversation as resolved.
Show resolved Hide resolved
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}

}