Skip to content

Commit

Permalink
Merge pull request #693 from vikinghawk/renameQueueOnRecovery
Browse files Browse the repository at this point in the history
Allow changing queue names during recovery
  • Loading branch information
michaelklishin committed Jul 3, 2021
2 parents f876392 + a19879b commit 5b740ce
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 13 deletions.
13 changes: 13 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -190,6 +191,8 @@ public class ConnectionFactory implements Cloneable {
* @since 5.4.0
*/
private RetryHandler topologyRecoveryRetryHandler;

private RecoveredQueueNameSupplier recoveredQueueNameSupplier;

/**
* Traffic listener notified of inbound and outbound {@link Command}s.
Expand Down Expand Up @@ -1267,6 +1270,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
result.setTrafficListener(trafficListener);
result.setCredentialsRefreshService(credentialsRefreshService);
return result;
Expand Down Expand Up @@ -1648,6 +1652,15 @@ public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalExc
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
}

/**
* Set the recovered queue name supplier. Default is use the same queue name when recovering queues.
*
* @param recoveredQueueNameSupplier queue name supplier
*/
public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
}

/**
* Traffic listener notified of inbound and outbound {@link Command}s.
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.TrafficListener;
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;

Expand Down Expand Up @@ -54,7 +55,8 @@ public class ConnectionParams {
private TopologyRecoveryFilter topologyRecoveryFilter;
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
private RetryHandler topologyRecoveryRetryHandler;

private RecoveredQueueNameSupplier recoveredQueueNameSupplier;

private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;

Expand Down Expand Up @@ -271,6 +273,14 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
public RetryHandler getTopologyRecoveryRetryHandler() {
return topologyRecoveryRetryHandler;
}

public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
}

public RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
return recoveredQueueNameSupplier;
}

public void setTrafficListener(TrafficListener trafficListener) {
this.trafficListener = trafficListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ public void queueDeclareNoWait(String queue,
durable(durable).
exclusive(exclusive).
autoDelete(autoDelete).
arguments(arguments);
arguments(arguments).
recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
recordQueue(queue, meta);

Expand Down Expand Up @@ -848,7 +849,8 @@ private void recordQueue(AMQP.Queue.DeclareOk ok, String queue, boolean durable,
durable(durable).
exclusive(exclusive).
autoDelete(autoDelete).
arguments(arguments);
arguments(arguments).
recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
q.serverNamed(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
private final Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;

private final RetryHandler retryHandler;

private final RecoveredQueueNameSupplier recoveredQueueNameSupplier;

public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
this(params, f, new ListAddressResolver(addrs));
Expand All @@ -119,6 +121,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
letAllPassFilter() : params.getTopologyRecoveryFilter();

this.retryHandler = params.getTopologyRecoveryRetryHandler();
this.recoveredQueueNameSupplier = params.getRecoveredQueueNameSupplier() == null ?
RecordedQueue.DEFAULT_QUEUE_NAME_SUPPLIER : params.getRecoveredQueueNameSupplier();
}

private void setupErrorOnWriteListenerForPotentialRecovery() {
Expand Down Expand Up @@ -564,6 +568,10 @@ public void addConsumerRecoveryListener(ConsumerRecoveryListener listener) {
public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
this.consumerRecoveryListeners.remove(listener);
}

RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
return this.recoveredQueueNameSupplier;
}

private synchronized void beginAutomaticRecovery() throws InterruptedException {
final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
Expand Down Expand Up @@ -782,11 +790,7 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
this.propagateQueueNameChangeToConsumers(oldName, newName);
// bug26552:
// remove old name after we've updated the bindings and consumers,
// plus only for server-named queues, both to make sure we don't lose
// anything to recover. MK.
if(q.isServerNamed()) {
deleteRecordedQueue(oldName);
}
deleteRecordedQueue(oldName);
this.recordedQueues.put(newName, q);
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
*/
public class RecordedQueue extends RecordedNamedEntity {
public static final String EMPTY_STRING = "";

static final RecoveredQueueNameSupplier DEFAULT_QUEUE_NAME_SUPPLIER = q -> q.isServerNamed() ? EMPTY_STRING : q.name;

private RecoveredQueueNameSupplier recoveredQueueNameSupplier = DEFAULT_QUEUE_NAME_SUPPLIER;
private boolean durable;
private boolean autoDelete;
private Map<String, Object> arguments;
Expand Down Expand Up @@ -58,11 +62,7 @@ public void recover() throws IOException {
}

public String getNameToUseForRecovery() {
if(isServerNamed()) {
return EMPTY_STRING;
} else {
return this.name;
}
return recoveredQueueNameSupplier.getNameToUseForRecovery(this);
}

public RecordedQueue durable(boolean value) {
Expand All @@ -80,6 +80,11 @@ public RecordedQueue arguments(Map<String, Object> value) {
return this;
}

public RecordedQueue recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
return this;
}

@Override
public String toString() {
return "RecordedQueue[name=" + name + ", durable=" + durable + ", autoDelete=" + autoDelete + ", exclusive=" + exclusive + ", arguments=" + arguments + "serverNamed=" + serverNamed + ", channel=" + channel + "]";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.rabbitmq.client.impl.recovery;

/**
* Functional callback interface that can be used to rename a queue during topology recovery.
* Can use along with {@link QueueRecoveryListener} to know when such a queue has been recovered successfully.
*
* @see QueueRecoveryListener
*/
@FunctionalInterface
public interface RecoveredQueueNameSupplier {

/**
* Get the queue name to use when recovering this RecordedQueue entity
* @param recordedQueue the queue to be recovered
* @return new queue name
*/
String getNameToUseForRecovery(final RecordedQueue recordedQueue);
}

0 comments on commit 5b740ce

Please sign in to comment.