Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topology recovery retry fixes for auto-delete queues #692

Merged
merged 1 commit into from
Jul 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -759,49 +759,68 @@ public void recoverExchange(RecordedExchange x, boolean retry) {
}
}


/**
* Recover the queue. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
* @param oldName queue name
* @param q recorded queue
* @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
*/
public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
try {
if (topologyRecoveryFilter.filterQueue(q)) {
LOGGER.debug("Recovering {}", q);
if (retry) {
final RecordedQueue entity = q;
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
entity.recover();
return null;
}).getRecordedEntity();
} else {
q.recover();
}
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
// their new names. MK.
synchronized (this.recordedQueues) {
this.propagateQueueNameChangeToBindings(oldName, newName);
this.propagateQueueNameChangeToConsumers(oldName, newName);
// bug26552:
// remove old name after we've updated the bindings and consumers,
// plus only for server-named queues, both to make sure we don't lose
// anything to recover. MK.
if(q.isServerNamed()) {
deleteRecordedQueue(oldName);
}
this.recordedQueues.put(newName, q);
}
}
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
qrl.queueRecovered(oldName, newName);
}
LOGGER.debug("{} has recovered", q);
}
internalRecoverQueue(oldName, q, retry);
} catch (Exception cause) {
final String message = "Caught an exception while recovering queue " + oldName +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, q);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
}
}

/**
* Recover the queue. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
* @param oldName queue name
* @param q recorded queue
* @throws Exception if an error occurs recovering the queue
*/
void recoverQueue(final String oldName, RecordedQueue q) throws Exception {
internalRecoverQueue(oldName, q, false);
}

private void internalRecoverQueue(final String oldName, RecordedQueue q, boolean retry) throws Exception {
if (topologyRecoveryFilter.filterQueue(q)) {
LOGGER.debug("Recovering {}", q);
if (retry) {
final RecordedQueue entity = q;
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
entity.recover();
return null;
}).getRecordedEntity();
} else {
q.recover();
}
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
// their new names. MK.
synchronized (this.recordedQueues) {
this.propagateQueueNameChangeToBindings(oldName, newName);
this.propagateQueueNameChangeToConsumers(oldName, newName);
// bug26552:
// remove old name after we've updated the bindings and consumers,
// plus only for server-named queues, both to make sure we don't lose
// anything to recover. MK.
if(q.isServerNamed()) {
deleteRecordedQueue(oldName);
}
this.recordedQueues.put(newName, q);
}
}
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
qrl.queueRecovered(oldName, newName);
}
LOGGER.debug("{} has recovered", q);
}
}

public void recoverBinding(RecordedBinding b, boolean retry) {
try {
Expand All @@ -825,41 +844,61 @@ public void recoverBinding(RecordedBinding b, boolean retry) {
}
}

/**
* Recover the consumer. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
* @param tag consumer tag
* @param consumer recorded consumer
* @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
*/
public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
LOGGER.debug("Recovering {}", consumer);
String newTag = null;
if (retry) {
final RecordedConsumer entity = consumer;
RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
newTag = (String) retryResult.getResult();
} else {
newTag = consumer.recover();
}

// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
this.consumers.remove(tag);
this.consumers.put(newTag, consumer);
}
consumer.getChannel().updateConsumerTag(tag, newTag);
}

for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
crl.consumerRecovered(tag, newTag);
}
LOGGER.debug("{} has recovered", consumer);
}
internalRecoverConsumer(tag, consumer, retry);
} catch (Exception cause) {
final String message = "Caught an exception while recovering consumer " + tag +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, consumer);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
}
}

/**
* Recover the consumer. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
* @param tag consumer tag
* @param consumer recorded consumer
* @throws Exception if an error occurs recovering the consumer
*/
void recoverConsumer(final String tag, RecordedConsumer consumer) throws Exception {
internalRecoverConsumer(tag, consumer, false);
}

private void internalRecoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) throws Exception {
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
LOGGER.debug("Recovering {}", consumer);
String newTag = null;
if (retry) {
final RecordedConsumer entity = consumer;
RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
newTag = (String) retryResult.getResult();
} else {
newTag = consumer.recover();
}

// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
this.consumers.remove(tag);
this.consumers.put(newTag, consumer);
}
consumer.getChannel().updateConsumerTag(tag, newTag);
}

for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
crl.consumerRecovered(tag, newTag);
}
LOGGER.debug("{} has recovered", consumer);
}
}

private <T> RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable<T> recoveryAction) throws Exception {
if (this.retryHandler == null) {
Expand Down
18 changes: 16 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public RecordedQueue exclusive(boolean value) {
this.exclusive = value;
return this;
}

public boolean isExclusive() {
return this.exclusive;
}

public RecordedQueue serverNamed(boolean value) {
this.serverNamed = value;
Expand All @@ -47,8 +51,6 @@ public boolean isServerNamed() {
return this.serverNamed;
}

public boolean isAutoDelete() { return this.autoDelete; }

public void recover() throws IOException {
this.name = this.channel.getDelegate().queueDeclare(this.getNameToUseForRecovery(),
this.durable,
Expand All @@ -69,17 +71,29 @@ public RecordedQueue durable(boolean value) {
this.durable = value;
return this;
}

public boolean isDurable() {
return this.durable;
}

public RecordedQueue autoDelete(boolean value) {
this.autoDelete = value;
return this;
}

public boolean isAutoDelete() {
return this.autoDelete;
}

public RecordedQueue arguments(Map<String, Object> value) {
this.arguments = value;
return this;
}

public Map<String, Object> getArguments() {
return arguments;
}

@Override
public String toString() {
return "RecordedQueue[name=" + name + ", durable=" + durable + ", autoDelete=" + autoDelete + ", exclusive=" + exclusive + ", arguments=" + arguments + "serverNamed=" + serverNamed + ", channel=" + channel + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.Map.Entry;
import java.util.function.BiPredicate;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;

Expand Down Expand Up @@ -62,7 +65,7 @@ public abstract class TopologyRecoveryRetryLogic {
if (context.entity() instanceof RecordedQueue) {
final RecordedQueue recordedQueue = context.queue();
AutorecoveringConnection connection = context.connection();
connection.recoverQueue(recordedQueue.getName(), recordedQueue, false);
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before: if this failed to recover the queue, the error was delivered to the exception handler and swalled from the perspective of this retry logic. So the queue never got recovered.

Now: this recoverQueue method will now throw an exception if an error occurs and the retry logic will catch that and continue to retry as long as the max retry attempts haven't been reached.

}
return null;
};
Expand All @@ -76,9 +79,7 @@ public abstract class TopologyRecoveryRetryLogic {
AutorecoveringConnection connection = context.connection();
RecordedQueue recordedQueue = connection.getRecordedQueues().get(binding.getDestination());
if (recordedQueue != null) {
connection.recoverQueue(
recordedQueue.getName(), recordedQueue, false
);
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
}
}
return null;
Expand Down Expand Up @@ -122,9 +123,7 @@ public abstract class TopologyRecoveryRetryLogic {
AutorecoveringConnection connection = context.connection();
RecordedQueue recordedQueue = connection.getRecordedQueues().get(consumer.getQueue());
if (recordedQueue != null) {
connection.recoverQueue(
recordedQueue.getName(), recordedQueue, false
);
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
}
}
return null;
Expand Down Expand Up @@ -165,14 +164,52 @@ public abstract class TopologyRecoveryRetryLogic {
} else if (consumer.getChannel() == channel) {
final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection());
RECOVER_CONSUMER_QUEUE.call(retryContext);
context.connection().recoverConsumer(consumer.getConsumerTag(), consumer, false);
context.connection().recoverConsumer(consumer.getConsumerTag(), consumer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar concept here as with the recoverQueue... we want this to throw an exception rather than deliver to the exceptionhandler and swallow the exception

RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext);
}
}
return context.consumer().getConsumerTag();
}
return null;
};

/**
* Recover earlier auto-delete or exclusive queues that share the same channel as this retry context
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_PREVIOUS_AUTO_DELETE_QUEUES = context -> {
if (context.entity() instanceof RecordedQueue) {
AutorecoveringConnection connection = context.connection();
RecordedQueue queue = context.queue();
// recover all queues for the same channel that had already been recovered successfully before this queue failed.
// If the previous ones were auto-delete or exclusive, they need recovered again
for (Entry<String, RecordedQueue> entry : Utility.copy(connection.getRecordedQueues()).entrySet()) {
if (entry.getValue() == queue) {
// we have gotten to the queue in this context. Since this is an ordered map we can now break
// as we know we have recovered all the earlier queues on this channel
break;
} else if (queue.getChannel() == entry.getValue().getChannel()
&& (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) {
connection.recoverQueue(entry.getKey(), entry.getValue());
}
}
} else if (context.entity() instanceof RecordedQueueBinding) {
AutorecoveringConnection connection = context.connection();
Set<String> queues = new LinkedHashSet<>();
for (Entry<String, RecordedQueue> entry : Utility.copy(connection.getRecordedQueues()).entrySet()) {
if (context.entity().getChannel() == entry.getValue().getChannel()
&& (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) {
connection.recoverQueue(entry.getKey(), entry.getValue());
queues.add(entry.getValue().getName());
}
}
for (final RecordedBinding binding : Utility.copy(connection.getRecordedBindings())) {
if (binding instanceof RecordedQueueBinding && queues.contains(binding.getDestination())) {
binding.recover();
}
}
}
return null;
};

/**
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
Expand All @@ -188,11 +225,13 @@ public abstract class TopologyRecoveryRetryLogic {
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.queueRecoveryRetryOperation(RECOVER_CHANNEL
.andThen(RECOVER_QUEUE))
.andThen(RECOVER_QUEUE)
.andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES))
.bindingRecoveryRetryOperation(RECOVER_CHANNEL
.andThen(RECOVER_BINDING_QUEUE)
.andThen(RECOVER_BINDING)
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS))
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)
.andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL
.andThen(RECOVER_CONSUMER_QUEUE)
.andThen(RECOVER_CONSUMER)
Expand Down