Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow changing queue names during recovery #693

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -190,6 +191,8 @@ public class ConnectionFactory implements Cloneable {
* @since 5.4.0
*/
private RetryHandler topologyRecoveryRetryHandler;

private RecoveredQueueNameSupplier recoveredQueueNameSupplier;

/**
* Traffic listener notified of inbound and outbound {@link Command}s.
Expand Down Expand Up @@ -1267,6 +1270,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
result.setTrafficListener(trafficListener);
result.setCredentialsRefreshService(credentialsRefreshService);
return result;
Expand Down Expand Up @@ -1648,6 +1652,15 @@ public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalExc
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
}

/**
* Set the recovered queue name supplier. Default is use the same queue name when recovering queues.
*
* @param recoveredQueueNameSupplier queue name supplier
*/
public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
}

/**
* Traffic listener notified of inbound and outbound {@link Command}s.
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.TrafficListener;
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;

Expand Down Expand Up @@ -54,7 +55,8 @@ public class ConnectionParams {
private TopologyRecoveryFilter topologyRecoveryFilter;
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
private RetryHandler topologyRecoveryRetryHandler;

private RecoveredQueueNameSupplier recoveredQueueNameSupplier;

private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;

Expand Down Expand Up @@ -271,6 +273,14 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
public RetryHandler getTopologyRecoveryRetryHandler() {
return topologyRecoveryRetryHandler;
}

public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
}

public RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
return recoveredQueueNameSupplier;
}

public void setTrafficListener(TrafficListener trafficListener) {
this.trafficListener = trafficListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ public void queueDeclareNoWait(String queue,
durable(durable).
exclusive(exclusive).
autoDelete(autoDelete).
arguments(arguments);
arguments(arguments).
recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
recordQueue(queue, meta);

Expand Down Expand Up @@ -848,7 +849,8 @@ private void recordQueue(AMQP.Queue.DeclareOk ok, String queue, boolean durable,
durable(durable).
exclusive(exclusive).
autoDelete(autoDelete).
arguments(arguments);
arguments(arguments).
recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
q.serverNamed(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
private final Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;

private final RetryHandler retryHandler;

private final RecoveredQueueNameSupplier recoveredQueueNameSupplier;

public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
this(params, f, new ListAddressResolver(addrs));
Expand All @@ -119,6 +121,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
letAllPassFilter() : params.getTopologyRecoveryFilter();

this.retryHandler = params.getTopologyRecoveryRetryHandler();
this.recoveredQueueNameSupplier = params.getRecoveredQueueNameSupplier() == null ?
RecordedQueue.DEFAULT_QUEUE_NAME_SUPPLIER : params.getRecoveredQueueNameSupplier();
}

private void setupErrorOnWriteListenerForPotentialRecovery() {
Expand Down Expand Up @@ -564,6 +568,10 @@ public void addConsumerRecoveryListener(ConsumerRecoveryListener listener) {
public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
this.consumerRecoveryListeners.remove(listener);
}

RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
return this.recoveredQueueNameSupplier;
}

private synchronized void beginAutomaticRecovery() throws InterruptedException {
final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
Expand Down Expand Up @@ -782,11 +790,7 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
this.propagateQueueNameChangeToConsumers(oldName, newName);
// bug26552:
// remove old name after we've updated the bindings and consumers,
// plus only for server-named queues, both to make sure we don't lose
// anything to recover. MK.
if(q.isServerNamed()) {
deleteRecordedQueue(oldName);
}
deleteRecordedQueue(oldName);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wanted to call out that this is removing the q.isServerNamed() check here. But i don't think that is an issue as i don't believe their would have historically been anyway for the name to change unless it was server named.

this.recordedQueues.put(newName, q);
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
*/
public class RecordedQueue extends RecordedNamedEntity {
public static final String EMPTY_STRING = "";

static final RecoveredQueueNameSupplier DEFAULT_QUEUE_NAME_SUPPLIER = q -> q.isServerNamed() ? EMPTY_STRING : q.name;

private RecoveredQueueNameSupplier recoveredQueueNameSupplier = DEFAULT_QUEUE_NAME_SUPPLIER;
private boolean durable;
private boolean autoDelete;
private Map<String, Object> arguments;
Expand Down Expand Up @@ -58,11 +62,7 @@ public void recover() throws IOException {
}

public String getNameToUseForRecovery() {
if(isServerNamed()) {
return EMPTY_STRING;
} else {
return this.name;
}
return recoveredQueueNameSupplier.getNameToUseForRecovery(this);
}

public RecordedQueue durable(boolean value) {
Expand All @@ -80,6 +80,11 @@ public RecordedQueue arguments(Map<String, Object> value) {
return this;
}

public RecordedQueue recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
return this;
}

@Override
public String toString() {
return "RecordedQueue[name=" + name + ", durable=" + durable + ", autoDelete=" + autoDelete + ", exclusive=" + exclusive + ", arguments=" + arguments + "serverNamed=" + serverNamed + ", channel=" + channel + "]";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.rabbitmq.client.impl.recovery;

/**
* Functional callback interface that can be used to rename a queue during topology recovery.
* Can use along with {@link QueueRecoveryListener} to know when such a queue has been recovered successfully.
*
* @see QueueRecoveryListener
*/
@FunctionalInterface
public interface RecoveredQueueNameSupplier {

/**
* Get the queue name to use when recovering this RecordedQueue entity
* @param recordedQueue the queue to be recovered
* @return new queue name
*/
String getNameToUseForRecovery(final RecordedQueue recordedQueue);
}