|
22 | 22 | import static org.hamcrest.Matchers.instanceOf;
|
23 | 23 | import static org.junit.Assert.assertEquals;
|
24 | 24 | import static org.junit.Assert.assertFalse;
|
| 25 | +import static org.junit.Assert.assertNotNull; |
25 | 26 | import static org.junit.Assert.assertNull;
|
26 | 27 | import static org.junit.Assert.assertSame;
|
27 | 28 | import static org.junit.Assert.assertThat;
|
|
55 | 56 | import org.mockito.ArgumentCaptor;
|
56 | 57 |
|
57 | 58 | import org.springframework.amqp.AmqpIOException;
|
| 59 | +import org.springframework.amqp.core.AcknowledgeMode; |
58 | 60 | import org.springframework.amqp.core.AnonymousQueue;
|
59 | 61 | import org.springframework.amqp.core.Message;
|
60 | 62 | import org.springframework.amqp.core.MessageListener;
|
|
64 | 66 | import org.springframework.amqp.rabbit.connection.Connection;
|
65 | 67 | import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
66 | 68 | import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
|
| 69 | +import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; |
67 | 70 | import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
68 | 71 | import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
69 | 72 | import org.springframework.amqp.rabbit.junit.BrokerRunning;
|
@@ -610,6 +613,32 @@ public void testErrorStopsContainer() throws Exception {
|
610 | 613 | assertFalse(this.container.isRunning());
|
611 | 614 | }
|
612 | 615 |
|
| 616 | + @Test |
| 617 | + public void testManualAckWithClosedChannel() throws Exception { |
| 618 | + final AtomicReference<IllegalStateException> exc = new AtomicReference<>(); |
| 619 | + final CountDownLatch latch = new CountDownLatch(1); |
| 620 | + this.container = createContainer((ChannelAwareMessageListener) (m, c) -> { |
| 621 | + if (exc.get() == null) { |
| 622 | + ((CachingConnectionFactory) this.template.getConnectionFactory()).resetConnection(); |
| 623 | + } |
| 624 | + try { |
| 625 | + c.basicAck(m.getMessageProperties().getDeliveryTag(), false); |
| 626 | + } |
| 627 | + catch (IllegalStateException e) { |
| 628 | + exc.set(e); |
| 629 | + } |
| 630 | + latch.countDown(); |
| 631 | + }, false, this.queue.getName()); |
| 632 | + this.container.setAcknowledgeMode(AcknowledgeMode.MANUAL); |
| 633 | + this.container.afterPropertiesSet(); |
| 634 | + this.container.start(); |
| 635 | + this.template.convertAndSend(this.queue.getName(), "foo"); |
| 636 | + assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| 637 | + this.container.stop(); |
| 638 | + assertNotNull(exc.get()); |
| 639 | + assertThat(exc.get().getMessage(), equalTo("Channel closed; cannot ack/nack")); |
| 640 | + } |
| 641 | + |
613 | 642 | private boolean containerStoppedForAbortWithBadListener() throws InterruptedException {
|
614 | 643 | Log logger = spy(TestUtils.getPropertyValue(container, "logger", Log.class));
|
615 | 644 | new DirectFieldAccessor(container).setPropertyValue("logger", logger);
|
|
0 commit comments