Skip to content

Commit

Permalink
merge: #12656
Browse files Browse the repository at this point in the history
12656: [Backport stable/8.2] fix: do not retake backup if it already exists r=deepthidevaki a=backport-action

# Description
Backport of #12626 to `stable/8.2`.

relates to #12623

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@users.noreply.github.com>
  • Loading branch information
3 people committed May 3, 2023
2 parents 2fb40dd + 4a262ca commit 958668f
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 72 deletions.
Expand Up @@ -8,11 +8,11 @@
package io.camunda.zeebe.backup.management;

import io.camunda.zeebe.backup.api.BackupIdentifier;
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStatusCode;

class BackupAlreadyExistsException extends RuntimeException {

BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatus status) {
BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatusCode status) {
super("Backup with id %s already exists, status of the backup is %s".formatted(id, status));
}
}
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -44,16 +45,22 @@ ActorFuture<Void> takeBackup(

backupsInProgress.add(inProgressBackup);

final var checkCurrentBackup = backupStore.getStatus(inProgressBackup.id());
final var checkCurrentBackup =
backupStore.list(
new BackupIdentifierWildcardImpl(
Optional.empty(),
Optional.of(inProgressBackup.id().partitionId()),
Optional.of(inProgressBackup.checkpointId())));

final ActorFuture<Void> backupSaved = concurrencyControl.createFuture();

checkCurrentBackup.whenCompleteAsync(
(status, error) -> {
(availableBackups, error) -> {
if (error != null) {
backupSaved.completeExceptionally(error);
} else {
takeBackupIfDoesNotExist(status, inProgressBackup, concurrencyControl, backupSaved);
takeBackupIfDoesNotExist(
availableBackups, inProgressBackup, concurrencyControl, backupSaved);
}
},
concurrencyControl::run);
Expand All @@ -63,12 +70,16 @@ ActorFuture<Void> takeBackup(
}

private void takeBackupIfDoesNotExist(
final BackupStatus status,
final Collection<BackupStatus> availableBackups,
final InProgressBackup inProgressBackup,
final ConcurrencyControl concurrencyControl,
final ActorFuture<Void> backupSaved) {

switch (status.statusCode()) {
final BackupStatusCode existingBackupStatus =
availableBackups.isEmpty()
? BackupStatusCode.DOES_NOT_EXIST
: Collections.max(availableBackups, BackupStatusCode.BY_STATUS).statusCode();
switch (existingBackupStatus) {
case COMPLETED -> {
LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id());
backupSaved.complete(null);
Expand All @@ -77,9 +88,9 @@ private void takeBackupIfDoesNotExist(
LOG.error(
"Backup {} already exists with status {}, will not take a new one",
inProgressBackup.id(),
status);
existingBackupStatus);
backupSaved.completeExceptionally(
new BackupAlreadyExistsException(inProgressBackup.id(), status));
new BackupAlreadyExistsException(inProgressBackup.id(), existingBackupStatus));
}
default -> {
final ActorFuture<Void> snapshotFound = concurrencyControl.createFuture();
Expand Down

0 comments on commit 958668f

Please sign in to comment.