Skip to content

Commit

Permalink
osd: check if osd is ok-to-stop before removal
Browse files Browse the repository at this point in the history
If multiple removal jobs are fired in parallel, there is a risk of
losing data since we will forcefully remove the OSD. So now, we check
if the OSD is ok-to-stop first and then proceed. The code waits forever
and retries every minute.

Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Nov 23, 2021
1 parent 801b5d6 commit 36b8bb9
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
14 changes: 10 additions & 4 deletions pkg/daemon/ceph/client/crash.go
Expand Up @@ -57,26 +57,31 @@ type CrashList struct {

// GetCrashList gets the list of Crashes.
func GetCrashList(context *clusterd.Context, clusterInfo *ClusterInfo) ([]CrashList, error) {
crashargs := []string{"crash", "ls"}
output, err := NewCephCommand(context, clusterInfo, crashargs).Run()
crashArgs := []string{"crash", "ls"}
output, err := NewCephCommand(context, clusterInfo, crashArgs).Run()
if err != nil {
return nil, errors.Wrap(err, "failed to list ceph crash")
}

var crash []CrashList
err = json.Unmarshal(output, &crash)
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal crash ls response. %s", string(output))
}

return crash, err
}

// ArchiveCrash archives the crash with respective crashID
func ArchiveCrash(context *clusterd.Context, clusterInfo *ClusterInfo, crashID string) error {
crashsilenceargs := []string{"crash", "archive", crashID}
_, err := NewCephCommand(context, clusterInfo, crashsilenceargs).Run()
logger.Infof("silencing crash %q", crashID)
crashSilenceArgs := []string{"crash", "archive", crashID}
_, err := NewCephCommand(context, clusterInfo, crashSilenceArgs).Run()
if err != nil {
return errors.Wrapf(err, "failed to archive crash %q", crashID)
}

logger.Infof("successfully silenced crash %q", crashID)
return nil
}

Expand All @@ -86,5 +91,6 @@ func GetCrash(context *clusterd.Context, clusterInfo *ClusterInfo) ([]CrashList,
if err != nil {
return nil, errors.Wrap(err, "failed to list ceph crash")
}

return crash, nil
}
34 changes: 29 additions & 5 deletions pkg/daemon/ceph/osd/remove.go
Expand Up @@ -19,6 +19,7 @@ package osd
import (
"fmt"
"strconv"
"time"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -58,8 +59,25 @@ func RemoveOSDs(context *clusterd.Context, clusterInfo *client.ClusterInfo, osds
if status == upStatus {
logger.Infof("osd.%d is healthy. It cannot be removed unless it is 'down'", osdID)
continue
} else {
logger.Infof("osd.%d is marked 'DOWN'", osdID)
}

// Check we can remove the OSD in case jobs are fired in parallel
// loop forever until the osd is ok-to-stop (the job might timeout eventually but it's
// better than loosing the osd data...)
for {
_, err := client.OSDOkToStop(context, clusterInfo, osdID, 1)
if err != nil {
logger.Errorf("failed to check if osd %d is ok to stop or not ok to stop, retrying in 1m. %v", osdID, err)
time.Sleep(1 * time.Minute)
} else {
logger.Infof("osd.%d is ok to stop", osdID)
break
}
}
logger.Infof("osd.%d is marked 'DOWN'. Removing it", osdID)

logger.Infof("OSD %d is ready to be removed, proceeding", osdID)
removeOSD(context, clusterInfo, osdID, preservePVC)
}

Expand All @@ -74,6 +92,7 @@ func removeOSD(clusterdContext *clusterd.Context, clusterInfo *client.ClusterInf
}

// Mark the OSD as out.
logger.Infof("marking osd.%d out", osdID)
args := []string{"osd", "out", fmt.Sprintf("osd.%d", osdID)}
_, err = client.NewCephCommand(clusterdContext, clusterInfo, args).Run()
if err != nil {
Expand Down Expand Up @@ -138,18 +157,21 @@ func removeOSD(clusterdContext *clusterd.Context, clusterInfo *client.ClusterInf
}

// purge the osd
purgeosdargs := []string{"osd", "purge", fmt.Sprintf("osd.%d", osdID), "--force", "--yes-i-really-mean-it"}
_, err = client.NewCephCommand(clusterdContext, clusterInfo, purgeosdargs).Run()
logger.Infof("purging osd.%d", osdID)
purgeOSDArgs := []string{"osd", "purge", fmt.Sprintf("osd.%d", osdID), "--force", "--yes-i-really-mean-it"}
_, err = client.NewCephCommand(clusterdContext, clusterInfo, purgeOSDArgs).Run()
if err != nil {
logger.Errorf("failed to purge osd.%d. %v", osdID, err)
}

// Attempting to remove the parent host. Errors can be ignored if there are other OSDs on the same host
hostargs := []string{"osd", "crush", "rm", hostName}
_, err = client.NewCephCommand(clusterdContext, clusterInfo, hostargs).Run()
logger.Infof("removing osd.%d from ceph", osdID)
hostArgs := []string{"osd", "crush", "rm", hostName}
_, err = client.NewCephCommand(clusterdContext, clusterInfo, hostArgs).Run()
if err != nil {
logger.Errorf("failed to remove CRUSH host %q. %v", hostName, err)
}

// call archiveCrash to silence crash warning in ceph health if any
archiveCrash(clusterdContext, clusterInfo, osdID)

Expand All @@ -167,13 +189,15 @@ func archiveCrash(clusterdContext *clusterd.Context, clusterInfo *client.Cluster
logger.Info("no ceph crash to silence")
return
}

var crashID string
for _, c := range crash {
if c.Entity == fmt.Sprintf("osd.%d", osdID) {
crashID = c.ID
break
}
}

err = client.ArchiveCrash(clusterdContext, clusterInfo, crashID)
if err != nil {
logger.Errorf("failed to archive the crash %q. %v", crashID, err)
Expand Down

0 comments on commit 36b8bb9

Please sign in to comment.