Skip to content

Commit

Permalink
Merge pull request #9350 from rook/mergify/bp/release-1.8/pr-9214
Browse files Browse the repository at this point in the history
pool: Update the desired failure domain when changed (backport #9214)
  • Loading branch information
mergify[bot] committed Dec 8, 2021
2 parents f0e19ea + 00eaccb commit f2fd2b7
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 53 deletions.
50 changes: 0 additions & 50 deletions Documentation/ceph-advanced-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ storage cluster.
* [OSD CRUSH Settings](#osd-crush-settings)
* [OSD Dedicated Network](#osd-dedicated-network)
* [Phantom OSD Removal](#phantom-osd-removal)
* [Change Failure Domain](#change-failure-domain)
* [Auto Expansion of OSDs](#auto-expansion-of-osds)

## Prerequisites
Expand Down Expand Up @@ -432,55 +431,6 @@ To recheck that the Phantom OSD was removed, re-run the following command and ch
ceph osd tree
```

## Change Failure Domain

In Rook, it is now possible to indicate how the default CRUSH failure domain rule must be configured in order to ensure that replicas or erasure code shards are separated across hosts, and a single host failure does not affect availability. For instance, this is an example manifest of a block pool named `replicapool` configured with a `failureDomain` set to `osd`:

```yaml
apiVersion: ceph.rook.io/v1
kind: CephBlockPool
metadata:
name: replicapool
namespace: rook
spec:
# The failure domain will spread the replicas of the data across different failure zones
failureDomain: osd
...
```

However, due to several reasons, we may need to change such failure domain to its other value: `host`. Unfortunately, changing it directly in the YAML manifest is not currently handled by Rook, so we need to perform the change directly using Ceph commands using the Rook tools pod, for instance:

```console
ceph osd pool get replicapool crush_rule
```

>```
>crush_rule: replicapool
>```
```console
ceph osd crush rule create-replicated replicapool_host_rule default host
```

Notice that the suffix `host_rule` in the name of the rule is just for clearness about the type of rule we are creating here, and can be anything else as long as it is different from the existing one. Once the new rule has been created, we simply apply it to our block pool:

```console
ceph osd pool set replicapool crush_rule replicapool_host_rule
```

And validate that it has been actually applied properly:

```console
ceph osd pool get replicapool crush_rule
```
>```
> crush_rule: replicapool_host_rule
>```
If the cluster's health was `HEALTH_OK` when we performed this change, immediately, the new rule is applied to the cluster transparently without service disruption.

Exactly the same approach can be used to change from `host` back to `osd`.

## Auto Expansion of OSDs

### Prerequisites
Expand Down
2 changes: 1 addition & 1 deletion Documentation/ceph-pool-crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ stretched) then you will have 2 replicas per datacenter where each replica ends
* `erasureCoded`: Settings for an erasure-coded pool. If specified, `replicated` settings must not be specified. See below for more details on [erasure coding](#erasure-coding).
* `dataChunks`: Number of chunks to divide the original object into
* `codingChunks`: Number of coding chunks to generate
* `failureDomain`: The failure domain across which the data will be spread. This can be set to a value of either `osd` or `host`, with `host` being the default setting. A failure domain can also be set to a different type (e.g. `rack`), if it is added as a `location` in the [Storage Selection Settings](ceph-cluster-crd.md#storage-selection-settings).
* `failureDomain`: The failure domain across which the data will be spread. This can be set to a value of either `osd` or `host`, with `host` being the default setting. A failure domain can also be set to a different type (e.g. `rack`), if the OSDs are created on nodes with the supported [topology labels](ceph-cluster-crd.md#osd-topology). If the `failureDomain` is changed on the pool, the operator will create a new CRUSH rule and update the pool.
If a `replicated` pool of size `3` is configured and the `failureDomain` is set to `host`, all three copies of the replicated data will be placed on OSDs located on `3` different Ceph hosts. This case is guaranteed to tolerate a failure of two hosts without a loss of data. Similarly, a failure domain set to `osd`, can tolerate a loss of two OSD devices.

If erasure coding is used, the data and coding chunks are spread across the configured failure domain.
Expand Down
1 change: 1 addition & 0 deletions PendingReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ v1.8...
(rook-ceph-mon secrets and configmap) so that the resources will not be accidentally deleted.
- Add support for [Kubernetes Authentication when using HashiCorp Vault Key Management Service](Documentation/ceph-kms.md##kubernetes-based-authentication).
- Bucket notification supported via CRDs
- The failure domain of a pool can be changed on the CephBlockPool instead of requiring toolbox commands
- The Rook Operator and the toolbox now run under the "rook" user and does not use "root" anymore.
- The Operator image now includes the `s5cmd` binary to interact with S3 gateways.
19 changes: 19 additions & 0 deletions pkg/daemon/ceph/client/crush_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package client

import (
"encoding/json"
"fmt"

"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
)

const (
Expand Down Expand Up @@ -180,3 +183,19 @@ func checkIfRuleIDExists(rules []ruleSpec, ID int) bool {

return false
}

func getCrushRule(context *clusterd.Context, clusterInfo *ClusterInfo, name string) (ruleSpec, error) {
var rule ruleSpec
args := []string{"osd", "crush", "rule", "dump", name}
buf, err := NewCephCommand(context, clusterInfo, args).Run()
if err != nil {
return rule, errors.Wrapf(err, "failed to get crush rule %q. %s", name, string(buf))
}

err = json.Unmarshal(buf, &rule)
if err != nil {
return rule, errors.Wrapf(err, "failed to unmarshal crush rule. %s", string(buf))
}

return rule, nil
}
84 changes: 82 additions & 2 deletions pkg/daemon/ceph/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ type CephStoragePoolDetails struct {
Number int `json:"pool_id"`
Size uint `json:"size"`
ErasureCodeProfile string `json:"erasure_code_profile"`
FailureDomain string `json:"failureDomain"`
CrushRoot string `json:"crushRoot"`
DeviceClass string `json:"deviceClass"`
CompressionMode string `json:"compression_mode"`
TargetSizeRatio float64 `json:"target_size_ratio,omitempty"`
RequireSafeReplicaSize bool `json:"requireSafeReplicaSize,omitempty"`
CrushRule string `json:"crush_rule"`
}

type CephStoragePoolStats struct {
Expand Down Expand Up @@ -128,7 +128,6 @@ func GetPoolDetails(context *clusterd.Context, clusterInfo *ClusterInfo, name st
}

func ParsePoolDetails(in []byte) (CephStoragePoolDetails, error) {

// The response for osd pool get when passing var=all is actually malformed JSON similar to:
// {"pool":"rbd","size":1}{"pool":"rbd","min_size":2}...
// Note the multiple top level entities, one for each property returned. To workaround this,
Expand Down Expand Up @@ -372,6 +371,9 @@ func CreateECPoolForApp(context *clusterd.Context, clusterInfo *ClusterInfo, poo
}

func CreateReplicatedPoolForApp(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, poolName string, pool cephv1.PoolSpec, pgCount, appName string) error {
// If it's a replicated pool, ensure the failure domain is desired
checkFailureDomain := false

// The crush rule name is the same as the pool unless we have a stretch cluster.
crushRuleName := poolName
if clusterSpec.IsStretchCluster() {
Expand All @@ -394,6 +396,7 @@ func CreateReplicatedPoolForApp(context *clusterd.Context, clusterInfo *ClusterI
}
} else {
// create a crush rule for a replicated pool, if a failure domain is specified
checkFailureDomain = true
if err := createReplicationCrushRule(context, clusterInfo, clusterSpec, crushRuleName, pool); err != nil {
return errors.Wrapf(err, "failed to create replicated crush rule %q", crushRuleName)
}
Expand Down Expand Up @@ -421,6 +424,83 @@ func CreateReplicatedPoolForApp(context *clusterd.Context, clusterInfo *ClusterI
}

logger.Infof("creating replicated pool %s succeeded", poolName)

if checkFailureDomain {
if err = ensureFailureDomain(context, clusterInfo, clusterSpec, poolName, pool); err != nil {
return nil
}
}
return nil
}

func ensureFailureDomain(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, poolName string, pool cephv1.PoolSpec) error {
if pool.FailureDomain == "" {
logger.Debugf("skipping check for failure domain on pool %q as it is not specified", poolName)
return nil
}

logger.Debugf("checking that pool %q has the failure domain %q", poolName, pool.FailureDomain)
details, err := GetPoolDetails(context, clusterInfo, poolName)
if err != nil {
return errors.Wrapf(err, "failed to get pool %q details", poolName)
}

// Find the failure domain for the current crush rule
rule, err := getCrushRule(context, clusterInfo, details.CrushRule)
if err != nil {
return errors.Wrapf(err, "failed to get crush rule %q", details.CrushRule)
}
currentFailureDomain := extractFailureDomain(rule)
if currentFailureDomain == pool.FailureDomain {
logger.Debugf("pool %q has the expected failure domain %q", poolName, pool.FailureDomain)
return nil
}
if currentFailureDomain == "" {
logger.Warningf("failure domain not found for crush rule %q, proceeding to create a new crush rule", details.CrushRule)
}

// Use a crush rule name that is unique to the desired failure domain
crushRuleName := fmt.Sprintf("%s_%s", poolName, pool.FailureDomain)
logger.Infof("updating pool %q failure domain from %q to %q with new crush rule %q", poolName, currentFailureDomain, pool.FailureDomain, crushRuleName)
logger.Infof("crush rule %q will no longer be used by pool %q", details.CrushRule, poolName)

// Create a new crush rule for the expected failure domain
if err := createReplicationCrushRule(context, clusterInfo, clusterSpec, crushRuleName, pool); err != nil {
return errors.Wrapf(err, "failed to create replicated crush rule %q", crushRuleName)
}

// Update the crush rule on the pool
if err := setCrushRule(context, clusterInfo, poolName, crushRuleName); err != nil {
return errors.Wrapf(err, "failed to set crush rule on pool %q", poolName)
}

logger.Infof("Successfully updated pool %q failure domain to %q", poolName, pool.FailureDomain)
return nil
}

func extractFailureDomain(rule ruleSpec) string {
// find the failure domain in the crush rule, which is the first step where the
// "type" property is set
for i, step := range rule.Steps {
if step.Type != "" {
return step.Type
}
// We expect the rule to be found by the second step, or else it is a more
// complex rule that would not be supported for updating the failure domain
if i == 1 {
break
}
}
return ""
}

func setCrushRule(context *clusterd.Context, clusterInfo *ClusterInfo, poolName, crushRule string) error {
args := []string{"osd", "pool", "set", poolName, "crush_rule", crushRule}

_, err := NewCephCommand(context, clusterInfo, args).Run()
if err != nil {
return errors.Wrapf(err, "failed to set crush rule %q", crushRule)
}
return nil
}

Expand Down
83 changes: 83 additions & 0 deletions pkg/daemon/ceph/client/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
package client

import (
"fmt"
"os/exec"
"reflect"
"strconv"
Expand Down Expand Up @@ -183,6 +184,88 @@ func testCreateReplicaPool(t *testing.T, failureDomain, crushRoot, deviceClass,
}
}

func TestUpdateFailureDomain(t *testing.T) {
var newCrushRule string
currentFailureDomain := "rack"
executor := &exectest.MockExecutor{}
context := &clusterd.Context{Executor: executor}
executor.MockExecuteCommandWithOutput = func(command string, args ...string) (string, error) {
logger.Infof("Command: %s %v", command, args)
if args[1] == "pool" {
if args[2] == "get" {
assert.Equal(t, "mypool", args[3])
return `{"crush_rule": "test_rule"}`, nil
}
if args[2] == "set" {
assert.Equal(t, "mypool", args[3])
assert.Equal(t, "crush_rule", args[4])
newCrushRule = args[5]
return "", nil
}
}
if args[1] == "crush" {
if args[2] == "rule" && args[3] == "dump" {
return fmt.Sprintf(`{"steps": [{"foo":"bar"},{"type":"%s"}]}`, currentFailureDomain), nil
}
newCrushRule = "foo"
return "", nil
}
return "", errors.Errorf("unexpected ceph command %q", args)
}

t.Run("no desired failure domain", func(t *testing.T) {
p := cephv1.PoolSpec{
Replicated: cephv1.ReplicatedSpec{Size: 3},
}
clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{}}
err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, "mypool", p)
assert.NoError(t, err)
assert.Equal(t, "", newCrushRule)
})

t.Run("same failure domain", func(t *testing.T) {
p := cephv1.PoolSpec{
FailureDomain: currentFailureDomain,
Replicated: cephv1.ReplicatedSpec{Size: 3},
}
clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{}}
err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, "mypool", p)
assert.NoError(t, err)
assert.Equal(t, "", newCrushRule)
})

t.Run("changing failure domain", func(t *testing.T) {
p := cephv1.PoolSpec{
FailureDomain: "zone",
Replicated: cephv1.ReplicatedSpec{Size: 3},
}
clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{}}
err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, "mypool", p)
assert.NoError(t, err)
assert.Equal(t, "mypool_zone", newCrushRule)
})
}

func TestExtractFailureDomain(t *testing.T) {
t.Run("complex crush rule skipped", func(t *testing.T) {
rule := ruleSpec{Steps: []stepSpec{
{Type: ""},
{Type: ""},
{Type: "zone"},
}}
failureDomain := extractFailureDomain(rule)
assert.Equal(t, "", failureDomain)
})
t.Run("valid crush rule", func(t *testing.T) {
rule := ruleSpec{Steps: []stepSpec{
{Type: ""},
{Type: "zone"},
}}
failureDomain := extractFailureDomain(rule)
assert.Equal(t, "zone", failureDomain)
})
}

func testIsStringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
Expand Down

0 comments on commit f2fd2b7

Please sign in to comment.