Skip to content

Commit 6761ab5

Browse files
artembilangaryrussell
authored andcommittedJan 17, 2018
GH-703: DLC.adjustConsumers: Fix remove algorithm
Fixes #703 When we adjust consumers down by more than 1 instance we end up with the `IndexOutOfBoundsException` because we perform removal by the calculated index. * Change algorithm to remove only from `0` index. In the end it doesn't matter which consumers remain in the container
1 parent 71af6b1 commit 6761ab5

File tree

2 files changed

+12
-13
lines changed

2 files changed

+12
-13
lines changed
 

‎spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.ScheduledFuture;
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.stream.Collectors;
34+
import java.util.stream.IntStream;
3435
import java.util.stream.Stream;
3536

3637
import org.apache.commons.logging.Log;
@@ -284,11 +285,9 @@ private void adjustConsumers(int newCount) {
284285
}
285286
List<SimpleConsumer> consumerList = this.consumersByQueue.get(queue);
286287
if (consumerList != null && consumerList.size() > newCount) {
287-
int currentCount = consumerList.size();
288-
for (int i = newCount; i < currentCount; i++) {
289-
SimpleConsumer consumer = consumerList.remove(i);
290-
cancelConsumer(consumer);
291-
}
288+
IntStream.range(newCount, consumerList.size())
289+
.mapToObj(i -> consumerList.remove(0))
290+
.forEach(this::cancelConsumer);
292291
}
293292
}
294293
}
@@ -555,9 +554,9 @@ private void doConsumeFromQueue(String queue) {
555554
}
556555
catch (Exception e) {
557556
addConsumerToRestart(new SimpleConsumer(null, null, queue));
558-
throw e instanceof AmqpConnectException
559-
? (AmqpConnectException) e
560-
: new AmqpConnectException(e);
557+
throw e instanceof AmqpConnectException
558+
? (AmqpConnectException) e
559+
: new AmqpConnectException(e);
561560
}
562561
finally {
563562
if (routingLookupKey != null) {

‎spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -275,7 +275,7 @@ public void testAddRemoveConsumers() throws Exception {
275275
cf.setExecutor(executor);
276276
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
277277
container.setQueueNames(Q1, Q2);
278-
container.setConsumersPerQueue(2);
278+
container.setConsumersPerQueue(4);
279279
container.setMessageListener(new MessageListenerAdapter((ReplyingMessageListener<String, String>) in -> {
280280
if ("foo".equals(in) || "bar".equals(in)) {
281281
return in.toUpperCase();
@@ -291,8 +291,8 @@ public void testAddRemoveConsumers() throws Exception {
291291
RabbitTemplate template = new RabbitTemplate(cf);
292292
assertEquals("FOO", template.convertSendAndReceive(Q1, "foo"));
293293
assertEquals("BAR", template.convertSendAndReceive(Q2, "bar"));
294-
assertTrue(consumersOnQueue(Q1, 2));
295-
assertTrue(consumersOnQueue(Q2, 2));
294+
assertTrue(consumersOnQueue(Q1, 4));
295+
assertTrue(consumersOnQueue(Q2, 4));
296296
container.setConsumersPerQueue(1);
297297
assertTrue(consumersOnQueue(Q1, 1));
298298
assertTrue(consumersOnQueue(Q2, 1));

0 commit comments

Comments
 (0)
Please sign in to comment.