|
1 | 1 | /*
|
2 |
| - * Copyright 2002-2017 the original author or authors. |
| 2 | + * Copyright 2002-2018 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
17 | 17 | package org.springframework.amqp.rabbit.core;
|
18 | 18 |
|
19 | 19 | import static org.hamcrest.Matchers.containsString;
|
| 20 | +import static org.hamcrest.Matchers.equalTo; |
20 | 21 | import static org.junit.Assert.assertEquals;
|
21 | 22 | import static org.junit.Assert.assertSame;
|
22 | 23 | import static org.junit.Assert.assertThat;
|
|
25 | 26 | import static org.mockito.ArgumentMatchers.anyBoolean;
|
26 | 27 | import static org.mockito.ArgumentMatchers.anyString;
|
27 | 28 | import static org.mockito.ArgumentMatchers.isNull;
|
| 29 | +import static org.mockito.BDDMockito.given; |
| 30 | +import static org.mockito.BDDMockito.willReturn; |
28 | 31 | import static org.mockito.Mockito.doAnswer;
|
29 | 32 | import static org.mockito.Mockito.mock;
|
30 | 33 | import static org.mockito.Mockito.times;
|
31 | 34 | import static org.mockito.Mockito.verify;
|
32 | 35 | import static org.mockito.Mockito.when;
|
33 | 36 | import static org.mockito.Mockito.withSettings;
|
34 | 37 |
|
| 38 | +import java.util.Collections; |
35 | 39 | import java.util.HashMap;
|
36 | 40 | import java.util.Map;
|
37 | 41 | import java.util.concurrent.ExecutorService;
|
|
48 | 52 | import org.springframework.amqp.core.Address;
|
49 | 53 | import org.springframework.amqp.core.Message;
|
50 | 54 | import org.springframework.amqp.core.MessageProperties;
|
| 55 | +import org.springframework.amqp.core.Queue; |
51 | 56 | import org.springframework.amqp.core.ReceiveAndReplyCallback;
|
52 | 57 | import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory;
|
53 | 58 | import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
| 59 | +import org.springframework.amqp.rabbit.connection.ChannelProxy; |
54 | 60 | import org.springframework.amqp.rabbit.connection.PublisherCallbackChannelConnectionFactory;
|
55 | 61 | import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
|
56 | 62 | import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
|
57 | 63 | import org.springframework.amqp.rabbit.support.PublisherCallbackChannel;
|
58 | 64 | import org.springframework.amqp.support.converter.SimpleMessageConverter;
|
59 | 65 | import org.springframework.amqp.utils.SerializationUtils;
|
| 66 | +import org.springframework.context.ApplicationContext; |
60 | 67 | import org.springframework.expression.Expression;
|
61 | 68 | import org.springframework.expression.spel.standard.SpelExpressionParser;
|
62 | 69 | import org.springframework.retry.support.RetryTemplate;
|
|
74 | 81 | import com.rabbitmq.client.Consumer;
|
75 | 82 | import com.rabbitmq.client.Envelope;
|
76 | 83 | import com.rabbitmq.client.impl.AMQImpl;
|
| 84 | +import com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk; |
77 | 85 |
|
78 | 86 | /**
|
79 | 87 | * @author Gary Russell
|
@@ -327,4 +335,65 @@ public void testRoutingConnectionFactory() throws Exception {
|
327 | 335 | Mockito.verify(connectionFactory2, Mockito.times(4)).createConnection();
|
328 | 336 | }
|
329 | 337 |
|
| 338 | + @Test |
| 339 | + public void testNestedTxBinding() throws Exception { |
| 340 | + ConnectionFactory cf = mock(ConnectionFactory.class); |
| 341 | + Connection connection = mock(Connection.class); |
| 342 | + Channel channel1 = mock(Channel.class, "channel1"); |
| 343 | + given(channel1.isOpen()).willReturn(true); |
| 344 | + Channel channel2 = mock(Channel.class, "channel2"); |
| 345 | + given(channel2.isOpen()).willReturn(true); |
| 346 | + willReturn(connection).given(cf).newConnection(any(ExecutorService.class), anyString()); |
| 347 | + given(connection.isOpen()).willReturn(true); |
| 348 | + given(connection.createChannel()).willReturn(channel1, channel2); |
| 349 | + DeclareOk dok = new DeclareOk("foo", 0, 0); |
| 350 | + willReturn(dok).given(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull()); |
| 351 | + CachingConnectionFactory ccf = new CachingConnectionFactory(cf); |
| 352 | + ccf.setExecutor(mock(ExecutorService.class)); |
| 353 | + RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf); |
| 354 | + rabbitTemplate.setChannelTransacted(true); |
| 355 | + RabbitAdmin admin = new RabbitAdmin(rabbitTemplate); |
| 356 | + ApplicationContext ac = mock(ApplicationContext.class); |
| 357 | + willReturn(Collections.singletonMap("foo", new Queue("foo"))).given(ac).getBeansOfType(Queue.class); |
| 358 | + admin.setApplicationContext(ac); |
| 359 | + admin.afterPropertiesSet(); |
| 360 | + AtomicReference<Channel> templateChannel = new AtomicReference<>(); |
| 361 | + new TransactionTemplate(new TestTransactionManager()).execute(s -> { |
| 362 | + return rabbitTemplate.execute(c -> { |
| 363 | + templateChannel.set(c); |
| 364 | + return true; |
| 365 | + }); |
| 366 | + }); |
| 367 | + verify(channel1).txSelect(); |
| 368 | + verify(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull()); |
| 369 | + assertThat(((ChannelProxy) templateChannel.get()).getTargetChannel(), equalTo(channel1)); |
| 370 | + verify(channel1).txCommit(); |
| 371 | + } |
| 372 | + |
| 373 | + @SuppressWarnings("serial") |
| 374 | + private class TestTransactionManager extends AbstractPlatformTransactionManager { |
| 375 | + |
| 376 | + TestTransactionManager() { |
| 377 | + super(); |
| 378 | + } |
| 379 | + |
| 380 | + @Override |
| 381 | + protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { |
| 382 | + } |
| 383 | + |
| 384 | + @Override |
| 385 | + protected void doCommit(DefaultTransactionStatus status) throws TransactionException { |
| 386 | + } |
| 387 | + |
| 388 | + @Override |
| 389 | + protected Object doGetTransaction() throws TransactionException { |
| 390 | + return new Object(); |
| 391 | + } |
| 392 | + |
| 393 | + @Override |
| 394 | + protected void doRollback(DefaultTransactionStatus status) throws TransactionException { |
| 395 | + } |
| 396 | + |
| 397 | + } |
| 398 | + |
330 | 399 | }
|
0 commit comments