Skip to content

Commit 221e3a9

Browse files
garyrussellartembilan
authored andcommittedDec 19, 2017
AMQP-793: Exception on ack for closed Channel
JIRA: https://jira.spring.io/browse/AMQP-793 Throw an exception to the caller when attempting to ack/nack a message on a closed channel.
1 parent 2626930 commit 221e3a9

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed
 

‎spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,11 @@ public class CachingConnectionFactory extends AbstractConnectionFactory
9696

9797
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
9898

99-
private static final Set<String> txStarts = new HashSet<>(Arrays.asList("basicPublish", "basicAck", "basicNack",
100-
"basicReject"));
99+
private static final Set<String> txStarts = new HashSet<>(Arrays.asList("basicPublish", "basicAck",
100+
"basicNack", "basicReject"));
101+
102+
private static final Set<String> ackMethods = new HashSet<>(Arrays.asList("basicAck",
103+
"basicNack", "basicReject"));
101104

102105
private static final Set<String> txEnds = new HashSet<>(Arrays.asList("txCommit", "txRollback"));
103106

@@ -957,11 +960,17 @@ else if (methodName.equals("isTransactional")) {
957960
if (this.target == null || !this.target.isOpen()) {
958961
if (this.target instanceof PublisherCallbackChannel) {
959962
this.target.close();
960-
throw new InvocationTargetException(new AmqpException("PublisherCallbackChannel is closed"));
963+
throw new InvocationTargetException(
964+
new AmqpException("PublisherCallbackChannel is closed"));
961965
}
962966
else if (this.txStarted) {
963967
this.txStarted = false;
964-
throw new IllegalStateException("Channel closed during transaction");
968+
throw new InvocationTargetException(
969+
new IllegalStateException("Channel closed during transaction"));
970+
}
971+
else if (ackMethods.contains(methodName)) {
972+
throw new InvocationTargetException(
973+
new IllegalStateException("Channel closed; cannot ack/nack"));
965974
}
966975
this.target = null;
967976
}

‎spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

+29
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.hamcrest.Matchers.instanceOf;
2323
import static org.junit.Assert.assertEquals;
2424
import static org.junit.Assert.assertFalse;
25+
import static org.junit.Assert.assertNotNull;
2526
import static org.junit.Assert.assertNull;
2627
import static org.junit.Assert.assertSame;
2728
import static org.junit.Assert.assertThat;
@@ -55,6 +56,7 @@
5556
import org.mockito.ArgumentCaptor;
5657

5758
import org.springframework.amqp.AmqpIOException;
59+
import org.springframework.amqp.core.AcknowledgeMode;
5860
import org.springframework.amqp.core.AnonymousQueue;
5961
import org.springframework.amqp.core.Message;
6062
import org.springframework.amqp.core.MessageListener;
@@ -64,6 +66,7 @@
6466
import org.springframework.amqp.rabbit.connection.Connection;
6567
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
6668
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
69+
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
6770
import org.springframework.amqp.rabbit.core.RabbitAdmin;
6871
import org.springframework.amqp.rabbit.core.RabbitTemplate;
6972
import org.springframework.amqp.rabbit.junit.BrokerRunning;
@@ -610,6 +613,32 @@ public void testErrorStopsContainer() throws Exception {
610613
assertFalse(this.container.isRunning());
611614
}
612615

616+
@Test
617+
public void testManualAckWithClosedChannel() throws Exception {
618+
final AtomicReference<IllegalStateException> exc = new AtomicReference<>();
619+
final CountDownLatch latch = new CountDownLatch(1);
620+
this.container = createContainer((ChannelAwareMessageListener) (m, c) -> {
621+
if (exc.get() == null) {
622+
((CachingConnectionFactory) this.template.getConnectionFactory()).resetConnection();
623+
}
624+
try {
625+
c.basicAck(m.getMessageProperties().getDeliveryTag(), false);
626+
}
627+
catch (IllegalStateException e) {
628+
exc.set(e);
629+
}
630+
latch.countDown();
631+
}, false, this.queue.getName());
632+
this.container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
633+
this.container.afterPropertiesSet();
634+
this.container.start();
635+
this.template.convertAndSend(this.queue.getName(), "foo");
636+
assertTrue(latch.await(10, TimeUnit.SECONDS));
637+
this.container.stop();
638+
assertNotNull(exc.get());
639+
assertThat(exc.get().getMessage(), equalTo("Channel closed; cannot ack/nack"));
640+
}
641+
613642
private boolean containerStoppedForAbortWithBadListener() throws InterruptedException {
614643
Log logger = spy(TestUtils.getPropertyValue(container, "logger", Log.class));
615644
new DirectFieldAccessor(container).setPropertyValue("logger", logger);

0 commit comments

Comments
 (0)
Please sign in to comment.