|
1 | 1 | /*
|
2 |
| - * Copyright 2016-2017 the original author or authors. |
| 2 | + * Copyright 2016-2018 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
31 | 31 | import java.util.concurrent.ScheduledFuture;
|
32 | 32 | import java.util.concurrent.TimeUnit;
|
33 | 33 | import java.util.stream.Collectors;
|
| 34 | +import java.util.stream.IntStream; |
34 | 35 | import java.util.stream.Stream;
|
35 | 36 |
|
36 | 37 | import org.apache.commons.logging.Log;
|
@@ -284,11 +285,9 @@ private void adjustConsumers(int newCount) {
|
284 | 285 | }
|
285 | 286 | List<SimpleConsumer> consumerList = this.consumersByQueue.get(queue);
|
286 | 287 | if (consumerList != null && consumerList.size() > newCount) {
|
287 |
| - int currentCount = consumerList.size(); |
288 |
| - for (int i = newCount; i < currentCount; i++) { |
289 |
| - SimpleConsumer consumer = consumerList.remove(i); |
290 |
| - cancelConsumer(consumer); |
291 |
| - } |
| 288 | + IntStream.range(newCount, consumerList.size()) |
| 289 | + .mapToObj(i -> consumerList.remove(0)) |
| 290 | + .forEach(this::cancelConsumer); |
292 | 291 | }
|
293 | 292 | }
|
294 | 293 | }
|
@@ -555,9 +554,9 @@ private void doConsumeFromQueue(String queue) {
|
555 | 554 | }
|
556 | 555 | catch (Exception e) {
|
557 | 556 | addConsumerToRestart(new SimpleConsumer(null, null, queue));
|
558 |
| - throw e instanceof AmqpConnectException |
559 |
| - ? (AmqpConnectException) e |
560 |
| - : new AmqpConnectException(e); |
| 557 | + throw e instanceof AmqpConnectException |
| 558 | + ? (AmqpConnectException) e |
| 559 | + : new AmqpConnectException(e); |
561 | 560 | }
|
562 | 561 | finally {
|
563 | 562 | if (routingLookupKey != null) {
|
|
0 commit comments