Skip to content

Commit

Permalink
GH-1108: @QueueBinding.key recursive resolution
Browse files Browse the repository at this point in the history
Fixes #1108

`@RabbitListener.queues()` resolves `String[]` recursively.
AMQP-722 added support for multiple routing keys to `@queueBinding` but
did not add support for recursive resolution. This is required to support
constructs like `key = "#{'${my-app.amqp.routing-key}'.split(',')}"`.

**cherry-pick to 2.1.x**

# Conflicts:
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessorTests.java

Fix test
  • Loading branch information
garyrussell authored and artembilan committed Oct 21, 2019
1 parent a69920d commit 53c9150
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ private String[] resolveQueues(RabbitListener rabbitListener) {
List<String> result = new ArrayList<String>();
if (queues.length > 0) {
for (int i = 0; i < queues.length; i++) {
resolveAsString(resolveExpression(queues[i]), result);
resolveAsString(resolveExpression(queues[i]), result, true, "queues");
}
}
if (queuesToDeclare.length > 0) {
Expand All @@ -526,25 +526,28 @@ private String[] resolveQueues(RabbitListener rabbitListener) {
}

@SuppressWarnings("unchecked")
private void resolveAsString(Object resolvedValue, List<String> result) {
private void resolveAsString(Object resolvedValue, List<String> result, boolean canBeQueue, String what) {
Object resolvedValueToUse = resolvedValue;
if (resolvedValue instanceof String[]) {
resolvedValueToUse = Arrays.asList((String[]) resolvedValue);
}
if (resolvedValueToUse instanceof Queue) {
if (canBeQueue && resolvedValueToUse instanceof Queue) {
result.add(((Queue) resolvedValueToUse).getName());
}
else if (resolvedValueToUse instanceof String) {
result.add((String) resolvedValueToUse);
}
else if (resolvedValueToUse instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValueToUse) {
resolveAsString(object, result);
resolveAsString(object, result, canBeQueue, what);
}
}
else {
throw new IllegalArgumentException(String.format(
"@RabbitListener can't resolve '%s' as either a String or a Queue",
"@RabbitListener."
+ what
+ " can't resolve '%s' as a String[] or a String "
+ (canBeQueue ? "or a Queue" : ""),
resolvedValue));
}
}
Expand Down Expand Up @@ -632,15 +635,15 @@ private void declareExchangeAndBinding(QueueBinding binding, String queueName) {
}

private void registerBindings(QueueBinding binding, String queueName, String exchangeName, String exchangeType) {
final String[] routingKeys;
final List<String> routingKeys;
if (exchangeType.equals(ExchangeTypes.FANOUT) || binding.key().length == 0) {
routingKeys = new String[] { "" };
routingKeys = Collections.singletonList("");
}
else {
final int length = binding.key().length;
routingKeys = new String[length];
routingKeys = new ArrayList<>();
for (int i = 0; i < length; ++i) {
routingKeys[i] = resolveExpressionAsString(binding.key()[i], "@QueueBinding.key");
resolveAsString(resolveExpression(binding.key()[i]), routingKeys, false, "@QueueBinding.key");
}
}
final Map<String, Object> bindingArguments = resolveArguments(binding.arguments());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ public void invalidValueInAnnotationTestBean() {
catch (BeanCreationException e) {
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), allOf(
containsString("@RabbitListener can't resolve"),
containsString("as either a String or a Queue")
containsString("@RabbitListener.queues can't resolve"),
containsString("as a String[] or a String or a Queue")
));
}
}
Expand All @@ -233,10 +233,11 @@ public void multipleRoutingKeysTestBean() {
assertThat(context.getBeansOfType(org.springframework.amqp.core.Exchange.class).values(), hasSize(1));

final List<Binding> bindings = new ArrayList<>(context.getBeansOfType(Binding.class).values());
assertThat(bindings, hasSize(2));
assertThat(bindings, hasSize(3));
bindings.sort(Comparator.comparing(Binding::getRoutingKey));
assertEquals("Binding [destination=my_queue, exchange=my_exchange, routingKey=red]", bindings.get(0).toString());
assertEquals("Binding [destination=my_queue, exchange=my_exchange, routingKey=yellow]", bindings.get(1).toString());
assertEquals("Binding [destination=my_queue, exchange=my_exchange, routingKey=green]", bindings.get(0).toString());
assertEquals("Binding [destination=my_queue, exchange=my_exchange, routingKey=red]", bindings.get(1).toString());
assertEquals("Binding [destination=my_queue, exchange=my_exchange, routingKey=yellow]", bindings.get(2).toString());

context.close();
}
Expand Down Expand Up @@ -394,8 +395,9 @@ public void handleIt(String body) {
static class MultipleRoutingKeysTestBean {

@RabbitListener(bindings = @QueueBinding(exchange = @Exchange("my_exchange"),
value = @org.springframework.amqp.rabbit.annotation.Queue(value = "my_queue", arguments = @Argument(name = "foo", value = "bar")),
key = {"${xxxxxxx:red}", "yellow"}))
value = @org.springframework.amqp.rabbit.annotation.Queue(value = "my_queue",
arguments = @Argument(name = "foo", value = "bar")),
key = {"${xxxxxxx:red}", "#{'yellow,green'.split(',')}"}))
public void handleIt(String body) {
}
}
Expand Down

0 comments on commit 53c9150

Please sign in to comment.