Skip to content

Commit

Permalink
GH-703: DLC.adjustConsumers: Fix remove algorithm
Browse files Browse the repository at this point in the history
Fixes #703

When we adjust consumers down by more than 1 instance we end up with the
`IndexOutOfBoundsException` because we perform removal by the
calculated index.

* Change algorithm to remove only from `0` index.
In the end it doesn't matter which consumers remain in the container
  • Loading branch information
artembilan authored and garyrussell committed Jan 17, 2018
1 parent 71af6b1 commit 6761ab5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -284,11 +285,9 @@ private void adjustConsumers(int newCount) {
}
List<SimpleConsumer> consumerList = this.consumersByQueue.get(queue);
if (consumerList != null && consumerList.size() > newCount) {
int currentCount = consumerList.size();
for (int i = newCount; i < currentCount; i++) {
SimpleConsumer consumer = consumerList.remove(i);
cancelConsumer(consumer);
}
IntStream.range(newCount, consumerList.size())
.mapToObj(i -> consumerList.remove(0))
.forEach(this::cancelConsumer);
}
}
}
Expand Down Expand Up @@ -555,9 +554,9 @@ private void doConsumeFromQueue(String queue) {
}
catch (Exception e) {
addConsumerToRestart(new SimpleConsumer(null, null, queue));
throw e instanceof AmqpConnectException
? (AmqpConnectException) e
: new AmqpConnectException(e);
throw e instanceof AmqpConnectException
? (AmqpConnectException) e
: new AmqpConnectException(e);
}
finally {
if (routingLookupKey != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -275,7 +275,7 @@ public void testAddRemoveConsumers() throws Exception {
cf.setExecutor(executor);
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
container.setQueueNames(Q1, Q2);
container.setConsumersPerQueue(2);
container.setConsumersPerQueue(4);
container.setMessageListener(new MessageListenerAdapter((ReplyingMessageListener<String, String>) in -> {
if ("foo".equals(in) || "bar".equals(in)) {
return in.toUpperCase();
Expand All @@ -291,8 +291,8 @@ public void testAddRemoveConsumers() throws Exception {
RabbitTemplate template = new RabbitTemplate(cf);
assertEquals("FOO", template.convertSendAndReceive(Q1, "foo"));
assertEquals("BAR", template.convertSendAndReceive(Q2, "bar"));
assertTrue(consumersOnQueue(Q1, 2));
assertTrue(consumersOnQueue(Q2, 2));
assertTrue(consumersOnQueue(Q1, 4));
assertTrue(consumersOnQueue(Q2, 4));
container.setConsumersPerQueue(1);
assertTrue(consumersOnQueue(Q1, 1));
assertTrue(consumersOnQueue(Q2, 1));
Expand Down

0 comments on commit 6761ab5

Please sign in to comment.