Skip to content

Commit

Permalink
merge: #12626
Browse files Browse the repository at this point in the history
12626: fix: do not retake backup if it already exists r=deepthidevaki a=deepthidevaki

## Description

In this PR, we check if the backup exists for that partition by listing using the wildcard. This returns the backups taken by other nodes as well. The backup is taken only if no backup exists for that partition with the same checkpointId. Previiously, we were checking using the full backupId which included the nodeId, so it can only find the backup taken by the same node. This is not ideal, because it can result in duplicate unnecessary backups, for example after restoring from a backup.

## Related issues

closes #12623 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@users.noreply.github.com>
  • Loading branch information
3 people committed May 3, 2023
2 parents 24edda5 + 65e08d9 commit 43b055b
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 72 deletions.
Expand Up @@ -8,11 +8,11 @@
package io.camunda.zeebe.backup.management;

import io.camunda.zeebe.backup.api.BackupIdentifier;
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStatusCode;

class BackupAlreadyExistsException extends RuntimeException {

BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatus status) {
BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatusCode status) {
super("Backup with id %s already exists, status of the backup is %s".formatted(id, status));
}
}
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -44,16 +45,22 @@ ActorFuture<Void> takeBackup(

backupsInProgress.add(inProgressBackup);

final var checkCurrentBackup = backupStore.getStatus(inProgressBackup.id());
final var checkCurrentBackup =
backupStore.list(
new BackupIdentifierWildcardImpl(
Optional.empty(),
Optional.of(inProgressBackup.id().partitionId()),
Optional.of(inProgressBackup.checkpointId())));

final ActorFuture<Void> backupSaved = concurrencyControl.createFuture();

checkCurrentBackup.whenCompleteAsync(
(status, error) -> {
(availableBackups, error) -> {
if (error != null) {
backupSaved.completeExceptionally(error);
} else {
takeBackupIfDoesNotExist(status, inProgressBackup, concurrencyControl, backupSaved);
takeBackupIfDoesNotExist(
availableBackups, inProgressBackup, concurrencyControl, backupSaved);
}
},
concurrencyControl::run);
Expand All @@ -63,12 +70,16 @@ ActorFuture<Void> takeBackup(
}

private void takeBackupIfDoesNotExist(
final BackupStatus status,
final Collection<BackupStatus> availableBackups,
final InProgressBackup inProgressBackup,
final ConcurrencyControl concurrencyControl,
final ActorFuture<Void> backupSaved) {

switch (status.statusCode()) {
final BackupStatusCode existingBackupStatus =
availableBackups.isEmpty()
? BackupStatusCode.DOES_NOT_EXIST
: Collections.max(availableBackups, BackupStatusCode.BY_STATUS).statusCode();
switch (existingBackupStatus) {
case COMPLETED -> {
LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id());
backupSaved.complete(null);
Expand All @@ -77,9 +88,9 @@ private void takeBackupIfDoesNotExist(
LOG.error(
"Backup {} already exists with status {}, will not take a new one",
inProgressBackup.id(),
status);
existingBackupStatus);
backupSaved.completeExceptionally(
new BackupAlreadyExistsException(inProgressBackup.id(), status));
new BackupAlreadyExistsException(inProgressBackup.id(), existingBackupStatus));
}
default -> {
final ActorFuture<Void> snapshotFound = concurrencyControl.createFuture();
Expand Down

0 comments on commit 43b055b

Please sign in to comment.