Skip to content

Commit af7ee3f

Browse files
garyrussellartembilan
authored andcommittedJan 23, 2018
AMQP-796: Fix Admin Transaction
JIRA: https://jira.spring.io/browse/AMQP-796 If an admin uses a transactional `RabbitTemplate` it will start a transaction. If the connection was opened due to a `RabbitTemplate` operation it should participate in the same transaction. Previously, the template used a second channel and treated it as a local transaction. Also fix the `RabbitAdmin` so it does no work if there is nothing to declare.
1 parent 6761ab5 commit af7ee3f

File tree

5 files changed

+113
-4
lines changed

5 files changed

+113
-4
lines changed
 

‎spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -155,6 +155,18 @@ private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionF
155155
channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory);
156156
if (channel == null && connection == null) {
157157
connection = resourceFactory.createConnection();
158+
if (resourceHolder == null) {
159+
/*
160+
* While creating a connection, a connection listener might have created a
161+
* transactional channel and bound it to the transaction.
162+
*/
163+
resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
164+
.getResource(connectionFactory);
165+
if (resourceHolder != null) {
166+
channel = resourceHolder.getChannel();
167+
resourceHolderToUse = resourceHolder;
168+
}
169+
}
158170
resourceHolderToUse.addConnection(connection);
159171
}
160172
if (channel == null) {

‎spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -475,6 +475,10 @@ else if (declarable instanceof Binding) {
475475
}
476476
}
477477

478+
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
479+
this.logger.debug("Nothing to declare");
480+
return;
481+
}
478482
this.rabbitTemplate.execute(channel -> {
479483
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
480484
declareQueues(channel, queues.toArray(new Queue[queues.size()]));

‎spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminDeclarationTests.java

+24
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@
2424
import static org.mockito.ArgumentMatchers.any;
2525
import static org.mockito.ArgumentMatchers.anyBoolean;
2626
import static org.mockito.ArgumentMatchers.anyMap;
27+
import static org.mockito.ArgumentMatchers.anyString;
2728
import static org.mockito.ArgumentMatchers.eq;
2829
import static org.mockito.ArgumentMatchers.isNull;
30+
import static org.mockito.BDDMockito.given;
31+
import static org.mockito.BDDMockito.willReturn;
2932
import static org.mockito.Mockito.doAnswer;
3033
import static org.mockito.Mockito.mock;
3134
import static org.mockito.Mockito.never;
@@ -55,6 +58,7 @@
5558
import org.springframework.amqp.rabbit.connection.Connection;
5659
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5760
import org.springframework.amqp.rabbit.connection.ConnectionListener;
61+
import org.springframework.context.ApplicationContext;
5862
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5963
import org.springframework.context.annotation.Bean;
6064
import org.springframework.context.annotation.Configuration;
@@ -313,6 +317,26 @@ public void testAddRemove() {
313317
}
314318
}
315319

320+
@Test
321+
public void testNoOpWhenNothingToDeclare() throws Exception {
322+
com.rabbitmq.client.ConnectionFactory cf = mock(com.rabbitmq.client.ConnectionFactory.class);
323+
com.rabbitmq.client.Connection connection = mock(com.rabbitmq.client.Connection.class);
324+
Channel channel = mock(Channel.class, "channel1");
325+
given(channel.isOpen()).willReturn(true);
326+
willReturn(connection).given(cf).newConnection(any(ExecutorService.class), anyString());
327+
given(connection.isOpen()).willReturn(true);
328+
given(connection.createChannel()).willReturn(channel);
329+
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
330+
ccf.setExecutor(mock(ExecutorService.class));
331+
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
332+
RabbitAdmin admin = new RabbitAdmin(rabbitTemplate);
333+
ApplicationContext ac = mock(ApplicationContext.class);
334+
admin.setApplicationContext(ac);
335+
admin.afterPropertiesSet();
336+
ccf.createConnection();
337+
verify(connection, never()).createChannel();
338+
}
339+
316340
@Configuration
317341
public static class Config {
318342

‎spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

‎spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

+70-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.amqp.rabbit.core;
1818

1919
import static org.hamcrest.Matchers.containsString;
20+
import static org.hamcrest.Matchers.equalTo;
2021
import static org.junit.Assert.assertEquals;
2122
import static org.junit.Assert.assertSame;
2223
import static org.junit.Assert.assertThat;
@@ -25,13 +26,16 @@
2526
import static org.mockito.ArgumentMatchers.anyBoolean;
2627
import static org.mockito.ArgumentMatchers.anyString;
2728
import static org.mockito.ArgumentMatchers.isNull;
29+
import static org.mockito.BDDMockito.given;
30+
import static org.mockito.BDDMockito.willReturn;
2831
import static org.mockito.Mockito.doAnswer;
2932
import static org.mockito.Mockito.mock;
3033
import static org.mockito.Mockito.times;
3134
import static org.mockito.Mockito.verify;
3235
import static org.mockito.Mockito.when;
3336
import static org.mockito.Mockito.withSettings;
3437

38+
import java.util.Collections;
3539
import java.util.HashMap;
3640
import java.util.Map;
3741
import java.util.concurrent.ExecutorService;
@@ -48,15 +52,18 @@
4852
import org.springframework.amqp.core.Address;
4953
import org.springframework.amqp.core.Message;
5054
import org.springframework.amqp.core.MessageProperties;
55+
import org.springframework.amqp.core.Queue;
5156
import org.springframework.amqp.core.ReceiveAndReplyCallback;
5257
import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory;
5358
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
59+
import org.springframework.amqp.rabbit.connection.ChannelProxy;
5460
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannelConnectionFactory;
5561
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
5662
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
5763
import org.springframework.amqp.rabbit.support.PublisherCallbackChannel;
5864
import org.springframework.amqp.support.converter.SimpleMessageConverter;
5965
import org.springframework.amqp.utils.SerializationUtils;
66+
import org.springframework.context.ApplicationContext;
6067
import org.springframework.expression.Expression;
6168
import org.springframework.expression.spel.standard.SpelExpressionParser;
6269
import org.springframework.retry.support.RetryTemplate;
@@ -74,6 +81,7 @@
7481
import com.rabbitmq.client.Consumer;
7582
import com.rabbitmq.client.Envelope;
7683
import com.rabbitmq.client.impl.AMQImpl;
84+
import com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk;
7785

7886
/**
7987
* @author Gary Russell
@@ -327,4 +335,65 @@ public void testRoutingConnectionFactory() throws Exception {
327335
Mockito.verify(connectionFactory2, Mockito.times(4)).createConnection();
328336
}
329337

338+
@Test
339+
public void testNestedTxBinding() throws Exception {
340+
ConnectionFactory cf = mock(ConnectionFactory.class);
341+
Connection connection = mock(Connection.class);
342+
Channel channel1 = mock(Channel.class, "channel1");
343+
given(channel1.isOpen()).willReturn(true);
344+
Channel channel2 = mock(Channel.class, "channel2");
345+
given(channel2.isOpen()).willReturn(true);
346+
willReturn(connection).given(cf).newConnection(any(ExecutorService.class), anyString());
347+
given(connection.isOpen()).willReturn(true);
348+
given(connection.createChannel()).willReturn(channel1, channel2);
349+
DeclareOk dok = new DeclareOk("foo", 0, 0);
350+
willReturn(dok).given(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
351+
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
352+
ccf.setExecutor(mock(ExecutorService.class));
353+
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
354+
rabbitTemplate.setChannelTransacted(true);
355+
RabbitAdmin admin = new RabbitAdmin(rabbitTemplate);
356+
ApplicationContext ac = mock(ApplicationContext.class);
357+
willReturn(Collections.singletonMap("foo", new Queue("foo"))).given(ac).getBeansOfType(Queue.class);
358+
admin.setApplicationContext(ac);
359+
admin.afterPropertiesSet();
360+
AtomicReference<Channel> templateChannel = new AtomicReference<>();
361+
new TransactionTemplate(new TestTransactionManager()).execute(s -> {
362+
return rabbitTemplate.execute(c -> {
363+
templateChannel.set(c);
364+
return true;
365+
});
366+
});
367+
verify(channel1).txSelect();
368+
verify(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
369+
assertThat(((ChannelProxy) templateChannel.get()).getTargetChannel(), equalTo(channel1));
370+
verify(channel1).txCommit();
371+
}
372+
373+
@SuppressWarnings("serial")
374+
private class TestTransactionManager extends AbstractPlatformTransactionManager {
375+
376+
TestTransactionManager() {
377+
super();
378+
}
379+
380+
@Override
381+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
382+
}
383+
384+
@Override
385+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
386+
}
387+
388+
@Override
389+
protected Object doGetTransaction() throws TransactionException {
390+
return new Object();
391+
}
392+
393+
@Override
394+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
395+
}
396+
397+
}
398+
330399
}

0 commit comments

Comments
 (0)
Please sign in to comment.