Skip to content

Commit

Permalink
osd: handle removal of encrypted osd deployment
Browse files Browse the repository at this point in the history
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Dec 16, 2021
1 parent 07f101e commit f2c0384
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 24 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/canary-integration-test.yml
Expand Up @@ -289,6 +289,11 @@ jobs:
kubectl -n rook-ceph get secrets
sudo lsblk
- name: test osd removal and re-hydration
run: |
kubectl -n rook-ceph delete deploy/rook-ceph-osd-0
tests/scripts/github-action-helper.sh wait_for_ceph_to_be_ready osd 2
- name: collect common logs
if: always()
uses: ./.github/workflows/collect-logs
Expand Down Expand Up @@ -617,7 +622,7 @@ jobs:
- name: wait for ceph cluster 1 to be ready
run: |
mkdir test
mkdir -p test
tests/scripts/validate_cluster.sh osd 1
kubectl -n rook-ceph get pods
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusterd/disk.go
Expand Up @@ -89,7 +89,7 @@ func DiscoverDevices(executor exec.Executor) ([]*sys.LocalDisk, error) {
// We only test if the type is 'disk', this is a property reported by lsblk
// and means it's a parent block device
if disk.Type == sys.DiskType {
deviceChild, err := sys.ListDevicesChild(executor, d)
deviceChild, err := sys.ListDevicesChild(executor, fmt.Sprintf("/dev/%s", d))
if err != nil {
logger.Warningf("failed to detect child devices for device %q, assuming they are none. %v", d, err)
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/daemon/ceph/osd/daemon.go
Expand Up @@ -302,8 +302,15 @@ func getAvailableDevices(context *clusterd.Context, agent *OsdAgent) (*DeviceOsd
// Allow further inspection of that device before skipping it
if device.Filesystem == "crypto_LUKS" && agent.pvcBacked {
if isCephEncryptedBlock(context, agent.clusterInfo.FSID, device.Name) {
logger.Infof("encrypted disk %q is an OSD part of this cluster, considering it", device.Name)
logger.Infof("encrypted disk %q is an OSD part of this cluster, skipping it", device.Name)
} else {
logger.Infof("encrypted disk %q is unknown, skipping it", device.Name)
}
// We must skip so that the device is not marked as available, but will later be
// picked up by the GetCephVolumeRawOSDs() call.
// This handles the case where the OSD deployment has been removed and the prepare
// job kicks in again to re-deploy the OSD.
continue
} else {
logger.Infof("skipping device %q because it contains a filesystem %q", device.Name, device.Filesystem)
continue
Expand Down
1 change: 1 addition & 0 deletions pkg/daemon/ceph/osd/encryption.go
Expand Up @@ -98,6 +98,7 @@ func setLUKSLabelAndSubsystem(context *clusterd.Context, clusterInfo *cephclient
label := fmt.Sprintf("pvc_name=%s", pvcName)

logger.Infof("setting LUKS subsystem to %q and label to %q to disk %q", subsystem, label, disk)
// 48 characters limit for both label and subsystem
args := []string{"config", disk, "--subsystem", subsystem, "--label", label}
output, err := context.Executor.ExecuteCommandWithCombinedOutput(cryptsetupBinary, args...)
if err != nil {
Expand Down
103 changes: 85 additions & 18 deletions pkg/daemon/ceph/osd/volume.go
Expand Up @@ -171,22 +171,22 @@ func (a *OsdAgent) configureCVDevices(context *clusterd.Context, devices *Device
osds = append(osds, rawOsds...)
}

return osds, nil
}

// List existing OSD(s) configured with ceph-volume lvm mode
lvmOsds, err = GetCephVolumeLVMOSDs(context, a.clusterInfo, a.clusterInfo.FSID, lvPath, false, false)
if err != nil {
logger.Infof("failed to get devices already provisioned by ceph-volume. %v", err)
}
osds = append(osds, lvmOsds...)
} else {
// Non-PVC case
// List existing OSD(s) configured with ceph-volume lvm mode
lvmOsds, err = GetCephVolumeLVMOSDs(context, a.clusterInfo, a.clusterInfo.FSID, lvPath, false, false)
if err != nil {
logger.Infof("failed to get devices already provisioned by ceph-volume. %v", err)
}
osds = append(osds, lvmOsds...)

// List existing OSD(s) configured with ceph-volume raw mode
rawOsds, err = GetCephVolumeRawOSDs(context, a.clusterInfo, a.clusterInfo.FSID, block, "", "", false, false)
if err != nil {
logger.Infof("failed to get device already provisioned by ceph-volume raw. %v", err)
// List existing OSD(s) configured with ceph-volume raw mode
rawOsds, err = GetCephVolumeRawOSDs(context, a.clusterInfo, a.clusterInfo.FSID, block, "", "", false, false)
if err != nil {
logger.Infof("failed to get device already provisioned by ceph-volume raw. %v", err)
}
osds = appendOSDInfo(osds, rawOsds)
}
osds = appendOSDInfo(osds, rawOsds)

return osds, nil
}
Expand Down Expand Up @@ -960,6 +960,68 @@ func GetCephVolumeRawOSDs(context *clusterd.Context, clusterInfo *client.Cluster
// it can be the one passed from the function's call or discovered by the c-v list command
var blockPath string

// If block is passed, check if it's an encrypted device, this is needed to get the correct
// device path and populate the OSDInfo for that OSD
// When the device is passed, this means we entered the case where no devices were found
// available, this indicates OSD have been prepared already.
// However, there is a scenario where we run the prepare job again and this is when the OSD
// deployment is removed. The operator will reconcile upon deletion of the OSD deployment thus
// re-running the prepare job to re-hydrate the OSDInfo.
//
// isCephEncryptedBlock() returns false if the disk is not a LUKS device with:
// Device /dev/sdc is not a valid LUKS device.
if isCephEncryptedBlock(context, cephfsid, block) {
childDevice, err := sys.ListDevicesChild(context.Executor, block)
if err != nil {
return nil, err
}

var encryptedBlock string
// Find the encrypted block as part of the output
// Most of the time we get 2 devices, the parent and the child but we don't want to guess
// which one is the child by looking at the index, instead the iterate over the list
// Our encrypted device **always** have "-dmcrypt" in the name.
for _, device := range childDevice {
if strings.Contains(device, "-dmcrypt") {
encryptedBlock = device
break
}
}
if encryptedBlock == "" {
// The encrypted block is not opened, this is an extreme corner case
// The OSD deployment has been removed manually AND the node rebooted
// So we need to re-open the block to re-hydrate the OSDInfo.
//
// Handling this case would mean, writing the encryption key on a temporary file, then call
// luksOpen to open the encrypted block and then call ceph-volume to list against the opened
// encrypted block.
// We don't implement this, yet and return an error.
return nil, errors.Errorf("failed to find the encrypted block device for %q, not opened?", block)
}

// If we have one child device, it should be the encrypted block but still verifying it
isDeviceEncrypted, err := sys.IsDeviceEncrypted(context.Executor, encryptedBlock)
if err != nil {
return nil, err
}

// All good, now we set the right disk for ceph-volume to list against
// The ceph-volume will look like:
// [root@bde85e6b23ec /]# ceph-volume raw list /dev/mapper/ocs-deviceset-thin-1-data-0hmfgp-block-dmcrypt
// {
// "4": {
// "ceph_fsid": "fea59c09-2d35-4096-bc46-edb0fd39ab86",
// "device": "/dev/mapper/ocs-deviceset-thin-1-data-0hmfgp-block-dmcrypt",
// "osd_id": 4,
// "osd_uuid": "fcff2062-e653-4d42-84e5-e8de639bed4b",
// "type": "bluestore"
// }
// }
if isDeviceEncrypted {
block = encryptedBlock
}
}

args := []string{cvMode, "list", block, "--format", "json"}
if block == "" {
setDevicePathFromList = true
Expand Down Expand Up @@ -1038,12 +1100,17 @@ func GetCephVolumeRawOSDs(context *clusterd.Context, clusterInfo *client.Cluster

// If this is an encrypted OSD
if os.Getenv(oposd.CephVolumeEncryptedKeyEnvVarName) != "" {
// // Set subsystem and label for recovery and detection
// If label and subsystem are not set on the encrypted block let's set it
// They will be set if the OSD deployment has been removed manually and the prepare job
// runs again.
// We use /mnt/<pvc_name> since LUKS label/subsystem must be applied on the main block device, not the resulting encrypted dm
mainBlock := fmt.Sprintf("/mnt/%s", os.Getenv(oposd.PVCNameEnvVarName))
err = setLUKSLabelAndSubsystem(context, clusterInfo, mainBlock)
if err != nil {
return nil, errors.Wrapf(err, "failed to set subsystem and label to encrypted device %q for osd %d", mainBlock, osdID)
if !isCephEncryptedBlock(context, cephfsid, mainBlock) {
// Set subsystem and label for recovery and detection
err = setLUKSLabelAndSubsystem(context, clusterInfo, mainBlock)
if err != nil {
return nil, errors.Wrapf(err, "failed to set subsystem and label to encrypted device %q for osd %d", mainBlock, osdID)
}
}

// Close encrypted device
Expand Down
9 changes: 9 additions & 0 deletions pkg/operator/ceph/cluster/osd/create.go
Expand Up @@ -190,6 +190,13 @@ func (c *Cluster) startProvisioningOverPVCs(config *provisionConfig, errs *provi
}
osdProps.storeConfig.DeviceClass = volume.CrushDeviceClass

// Skip OSD prepare if deployment already exists for the PVC
// Also skip the encryption work part to avoid overriding the existing encryption key
if existingDeployments.Has(dataSource.ClaimName) {
logger.Infof("skipping OSD prepare job creation for PVC %q because OSD daemon using the PVC already exists", osdProps.crushHostname)
continue
}

if osdProps.encrypted {
// If the deviceSet template has "encrypted" but the Ceph version is not compatible
if !c.clusterInfo.CephVersion.IsAtLeast(cephVolumeRawEncryptionModeMinOctopusCephVersion) {
Expand Down Expand Up @@ -221,6 +228,8 @@ func (c *Cluster) startProvisioningOverPVCs(config *provisionConfig, errs *provi
}

// Generate and store the encrypted key in whatever KMS is configured
// The PutSecret() call for each backend verifies whether the key is present already so
// no risk of overwriting an existing key.
err = kmsConfig.PutSecret(osdProps.pvc.ClaimName, key)
if err != nil {
errMsg := fmt.Sprintf("failed to store secret. %v", err)
Expand Down
17 changes: 15 additions & 2 deletions pkg/util/sys/device.go
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
osexec "os/exec"
"path"
"strconv"
"strings"

Expand Down Expand Up @@ -453,11 +452,25 @@ func lvmList(executor exec.Executor, lv string) (CephVolumeLVMList, error) {
}

// ListDevicesChild list all child available on a device
// For an encrypted device, it will return the encrypted device like so:
// lsblk --noheadings --output NAME --path --list /dev/sdd
// /dev/sdd
// /dev/mapper/ocs-deviceset-thin-1-data-0hmfgp-block-dmcrypt
func ListDevicesChild(executor exec.Executor, device string) ([]string, error) {
childListRaw, err := executor.ExecuteCommandWithOutput("lsblk", "--noheadings", "--pairs", path.Join("/dev", device))
childListRaw, err := executor.ExecuteCommandWithOutput("lsblk", "--noheadings", "--path", "--list", "--output", "NAME", device)
if err != nil {
return []string{}, fmt.Errorf("failed to list child devices of %q. %v", device, err)
}

return strings.Split(childListRaw, "\n"), nil
}

// IsDeviceEncrypted returns whether the disk has a "crypt" label on it
func IsDeviceEncrypted(executor exec.Executor, device string) (bool, error) {
deviceType, err := executor.ExecuteCommandWithOutput("lsblk", "--noheadings", "--output", "TYPE", device)
if err != nil {
return false, fmt.Errorf("failed to get devices type of %q. %v", device, err)
}

return deviceType == "crypt", nil
}
2 changes: 1 addition & 1 deletion tests/scripts/github-action-helper.sh
Expand Up @@ -250,7 +250,7 @@ function wait_for_prepare_pod() {
function wait_for_ceph_to_be_ready() {
DAEMONS=$1
OSD_COUNT=$2
mkdir test
mkdir -p test
tests/scripts/validate_cluster.sh "$DAEMONS" "$OSD_COUNT"
kubectl -n rook-ceph get pods
}
Expand Down

0 comments on commit f2c0384

Please sign in to comment.