Skip to content

Commit

Permalink
merge: #12662
Browse files Browse the repository at this point in the history
12662: [Backport stable/8.1] fix: do not retake backup if it already exists r=oleschoenburg a=deepthidevaki

Backport of #12626 to stable/8.1.

relates to #12623

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@users.noreply.github.com>
  • Loading branch information
3 people committed May 4, 2023
2 parents 35bf212 + eaff494 commit 67b98e6
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 73 deletions.
Expand Up @@ -8,11 +8,11 @@
package io.camunda.zeebe.backup.management;

import io.camunda.zeebe.backup.api.BackupIdentifier;
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStatusCode;

public class BackupAlreadyExistsException extends RuntimeException {
class BackupAlreadyExistsException extends RuntimeException {

public BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatus status) {
BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatusCode status) {
super("Backup with id %s already exists, status of the backup is %s".formatted(id, status));
}
}
Expand Up @@ -18,6 +18,7 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -46,16 +47,22 @@ ActorFuture<Void> takeBackup(

backupsInProgress.add(inProgressBackup);

final var checkCurrentBackup = backupStore.getStatus(inProgressBackup.id());
final var checkCurrentBackup =
backupStore.list(
new BackupIdentifierWildcardImpl(
Optional.empty(),
Optional.of(inProgressBackup.id().partitionId()),
Optional.of(inProgressBackup.checkpointId())));

final ActorFuture<Void> backupSaved = concurrencyControl.createFuture();

checkCurrentBackup.whenCompleteAsync(
(status, error) -> {
(availableBackups, error) -> {
if (error != null) {
backupSaved.completeExceptionally(error);
} else {
takeBackupIfDoesNotExist(status, inProgressBackup, concurrencyControl, backupSaved);
takeBackupIfDoesNotExist(
availableBackups, inProgressBackup, concurrencyControl, backupSaved);
}
},
concurrencyControl::run);
Expand All @@ -65,12 +72,16 @@ ActorFuture<Void> takeBackup(
}

private void takeBackupIfDoesNotExist(
final BackupStatus status,
final Collection<BackupStatus> availableBackups,
final InProgressBackup inProgressBackup,
final ConcurrencyControl concurrencyControl,
final ActorFuture<Void> backupSaved) {

switch (status.statusCode()) {
final BackupStatusCode existingBackupStatus =
availableBackups.isEmpty()
? BackupStatusCode.DOES_NOT_EXIST
: Collections.max(availableBackups, BackupStatusCode.BY_STATUS).statusCode();
switch (existingBackupStatus) {
case COMPLETED -> {
LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id());
backupSaved.complete(null);
Expand All @@ -79,9 +90,9 @@ private void takeBackupIfDoesNotExist(
LOG.error(
"Backup {} already exists with status {}, will not take a new one",
inProgressBackup.id(),
status);
existingBackupStatus);
backupSaved.completeExceptionally(
new BackupAlreadyExistsException(inProgressBackup.id(), status));
new BackupAlreadyExistsException(inProgressBackup.id(), existingBackupStatus));
}
default -> {
final ActorFuture<Void> snapshotFound = concurrencyControl.createFuture();
Expand Down

0 comments on commit 67b98e6

Please sign in to comment.