diff --git a/Documentation/ceph-pool-crd.md b/Documentation/ceph-pool-crd.md index 48fb4219af8fc..bc5cc6040565a 100644 --- a/Documentation/ceph-pool-crd.md +++ b/Documentation/ceph-pool-crd.md @@ -197,6 +197,11 @@ stretched) then you will have 2 replicas per datacenter where each replica ends * `deviceClass`: Sets up the CRUSH rule for the pool to distribute data only on the specified device class. If left empty or unspecified, the pool will use the cluster's default CRUSH root, which usually distributes data over all OSDs, regardless of their class. * `crushRoot`: The root in the crush map to be used by the pool. If left empty or unspecified, the default root will be used. Creating a crush hierarchy for the OSDs currently requires the Rook toolbox to run the Ceph tools described [here](http://docs.ceph.com/docs/master/rados/operations/crush-map/#modifying-the-crush-map). * `enableRBDStats`: Enables collecting RBD per-image IO statistics by enabling dynamic OSD performance counters. Defaults to false. For more info see the [ceph documentation](https://docs.ceph.com/docs/master/mgr/prometheus/#rbd-io-statistics). +* `name`: The name of Ceph pools is based on the `metadata.name` of the CephBlockPool CR. Some built-in Ceph pools + require names that are incompatible with K8s resource names. These special pools can be configured + by setting this `name` to override the name of the Ceph pool that is created instead of using the `metadata.name` for the pool. + Two pool names are supported: `device_health_metrics` and `.nfs`. See the example + [device health metrics pool](https://github.com/rook/rook/blob/{{ branchName }}/deploy/examples/pool-device-health-metrics.yaml). * `parameters`: Sets any [parameters](https://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values) listed to the given pool * `target_size_ratio:` gives a hint (%) to Ceph in terms of expected consumption of the total cluster capacity of a given pool, for more info see the [ceph documentation](https://docs.ceph.com/docs/master/rados/operations/placement-groups/#specifying-expected-pool-size) diff --git a/deploy/charts/rook-ceph/templates/resources.yaml b/deploy/charts/rook-ceph/templates/resources.yaml index 6a19264219d61..33531a8fce043 100644 --- a/deploy/charts/rook-ceph/templates/resources.yaml +++ b/deploy/charts/rook-ceph/templates/resources.yaml @@ -30,7 +30,7 @@ spec: metadata: type: object spec: - description: PoolSpec represents the spec of ceph pool + description: NamedPoolSpec allows a Ceph pool to be created with a non-default name properties: compressionMode: description: 'DEPRECATED: use Parameters instead, e.g., Parameters["compression_mode"] = "force" The inline compression mode in Bluestore OSD to set to (options are: none, passive, aggressive, force) Do NOT set a default value for kubebuilder as this will override the Parameters' @@ -110,6 +110,9 @@ spec: type: object type: array type: object + name: + description: The desired name of the pool if different from the default name. + type: string parameters: additionalProperties: type: string diff --git a/deploy/examples/cluster-test.yaml b/deploy/examples/cluster-test.yaml index 65e79072b19e5..cd3a3d1111b22 100644 --- a/deploy/examples/cluster-test.yaml +++ b/deploy/examples/cluster-test.yaml @@ -51,3 +51,15 @@ spec: timeout: 600s disruptionManagement: managePodBudgets: true +--- +apiVersion: ceph.rook.io/v1 +kind: CephBlockPool +metadata: + name: device-health-metrics + namespace: rook-ceph # namespace:cluster +spec: + name: device_health_metrics + failureDomain: host + replicated: + size: 1 + requireSafeReplicaSize: false diff --git a/deploy/examples/crds.yaml b/deploy/examples/crds.yaml index d8200eb3aac34..05e6d6d67b833 100644 --- a/deploy/examples/crds.yaml +++ b/deploy/examples/crds.yaml @@ -33,7 +33,7 @@ spec: metadata: type: object spec: - description: PoolSpec represents the spec of ceph pool + description: NamedPoolSpec allows a Ceph pool to be created with a non-default name properties: compressionMode: description: 'DEPRECATED: use Parameters instead, e.g., Parameters["compression_mode"] = "force" The inline compression mode in Bluestore OSD to set to (options are: none, passive, aggressive, force) Do NOT set a default value for kubebuilder as this will override the Parameters' @@ -113,6 +113,9 @@ spec: type: object type: array type: object + name: + description: The desired name of the pool if different from the default name. + type: string parameters: additionalProperties: type: string diff --git a/deploy/examples/nfs.yaml b/deploy/examples/nfs.yaml index 0fd3eb731865e..4b8ccee78181f 100644 --- a/deploy/examples/nfs.yaml +++ b/deploy/examples/nfs.yaml @@ -1,3 +1,27 @@ +################################################################################################################# +# Create a Ceph pool with settings for replication in production environments. A minimum of 3 OSDs on +# different hosts are required in this example. +# kubectl create -f pool.yaml +################################################################################################################# + +apiVersion: ceph.rook.io/v1 +kind: CephBlockPool +metadata: + name: builtin-nfs + namespace: rook-ceph # namespace:cluster +spec: + # The required pool name ".nfs" cannot be specified as a K8s resource name, thus we override + # the pool name created in Ceph with this name property + name: .nfs + failureDomain: host + replicated: + size: 3 + requireSafeReplicaSize: true + parameters: + compression_mode: none + mirroring: + enabled: false +--- apiVersion: ceph.rook.io/v1 kind: CephNFS metadata: diff --git a/deploy/examples/pool-device-health-metrics.yaml b/deploy/examples/pool-device-health-metrics.yaml new file mode 100644 index 0000000000000..e8becdbbda787 --- /dev/null +++ b/deploy/examples/pool-device-health-metrics.yaml @@ -0,0 +1,21 @@ +apiVersion: ceph.rook.io/v1 +kind: CephBlockPool +metadata: + # If the built-in Ceph pool for health metrics needs to be configured with alternate + # settings, create this pool with any of the pool properties. Create this pool immediately + # with the cluster CR, or else some properties may not be applied when Ceph creates the + # pool by default. + name: device-health-metrics + namespace: rook-ceph # namespace:cluster +spec: + # The required pool name with underscores cannot be specified as a K8s resource name, thus we override + # the pool name created in Ceph with this name property. + name: device_health_metrics + failureDomain: host + replicated: + size: 3 + requireSafeReplicaSize: true + parameters: + compression_mode: none + mirroring: + enabled: false diff --git a/pkg/apis/ceph.rook.io/v1/pool.go b/pkg/apis/ceph.rook.io/v1/pool.go index 23ee14993cefc..021a49cc9ced1 100644 --- a/pkg/apis/ceph.rook.io/v1/pool.go +++ b/pkg/apis/ceph.rook.io/v1/pool.go @@ -60,7 +60,7 @@ func (p *CephBlockPool) ValidateCreate() error { return nil } -func validatePoolSpec(ps PoolSpec) error { +func validatePoolSpec(ps NamedPoolSpec) error { // Checks if either ErasureCoded or Replicated fields are set if ps.ErasureCoded.CodingChunks <= 0 && ps.ErasureCoded.DataChunks <= 0 && ps.Replicated.TargetSizeRatio <= 0 && ps.Replicated.Size <= 0 { return errors.New("invalid create: either of erasurecoded or replicated fields should be set") diff --git a/pkg/apis/ceph.rook.io/v1/pool_test.go b/pkg/apis/ceph.rook.io/v1/pool_test.go index 8e8bb5d427268..9c55734281d26 100644 --- a/pkg/apis/ceph.rook.io/v1/pool_test.go +++ b/pkg/apis/ceph.rook.io/v1/pool_test.go @@ -28,10 +28,12 @@ func TestValidatePoolSpec(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "ec-pool", }, - Spec: PoolSpec{ - ErasureCoded: ErasureCodedSpec{ - CodingChunks: 1, - DataChunks: 2, + Spec: NamedPoolSpec{ + PoolSpec: PoolSpec{ + ErasureCoded: ErasureCodedSpec{ + CodingChunks: 1, + DataChunks: 2, + }, }, }, } @@ -48,8 +50,10 @@ func TestCephBlockPoolValidateUpdate(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "ec-pool", }, - Spec: PoolSpec{ - Replicated: ReplicatedSpec{RequireSafeReplicaSize: true, Size: 3}, + Spec: NamedPoolSpec{ + PoolSpec: PoolSpec{ + Replicated: ReplicatedSpec{RequireSafeReplicaSize: true, Size: 3}, + }, }, } up := p.DeepCopy() diff --git a/pkg/apis/ceph.rook.io/v1/types.go b/pkg/apis/ceph.rook.io/v1/types.go index e038913354a67..66ccc9d6587a1 100755 --- a/pkg/apis/ceph.rook.io/v1/types.go +++ b/pkg/apis/ceph.rook.io/v1/types.go @@ -565,7 +565,7 @@ type CrashCollectorSpec struct { type CephBlockPool struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata"` - Spec PoolSpec `json:"spec"` + Spec NamedPoolSpec `json:"spec"` // +kubebuilder:pruning:PreserveUnknownFields Status *CephBlockPoolStatus `json:"status,omitempty"` } @@ -585,6 +585,16 @@ const ( DefaultCRUSHRoot = "default" ) +// NamedPoolSpec allows a Ceph pool to be created with a non-default name +type NamedPoolSpec struct { + // The desired name of the pool if different from the default name. + // +optional + Name string `json:"name,omitempty"` + + // The core pool configuration + PoolSpec `json:",inline"` +} + // PoolSpec represents the spec of ceph pool type PoolSpec struct { // The failure domain: osd/host/(region or zone if available) - technically also any type in the crush map diff --git a/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go b/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go index 424d4df9246e9..27200c8159aec 100644 --- a/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go +++ b/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go @@ -2475,6 +2475,23 @@ func (in *NFSGaneshaSpec) DeepCopy() *NFSGaneshaSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NamedPoolSpec) DeepCopyInto(out *NamedPoolSpec) { + *out = *in + in.PoolSpec.DeepCopyInto(&out.PoolSpec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamedPoolSpec. +func (in *NamedPoolSpec) DeepCopy() *NamedPoolSpec { + if in == nil { + return nil + } + out := new(NamedPoolSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NetworkSpec) DeepCopyInto(out *NetworkSpec) { *out = *in diff --git a/pkg/daemon/ceph/client/crush_rule_test.go b/pkg/daemon/ceph/client/crush_rule_test.go index 77f19a60141bb..ce799bfdf1383 100644 --- a/pkg/daemon/ceph/client/crush_rule_test.go +++ b/pkg/daemon/ceph/client/crush_rule_test.go @@ -32,7 +32,7 @@ func TestBuildStretchClusterCrushRule(t *testing.T) { err := json.Unmarshal([]byte(testCrushMap), &crushMap) assert.NoError(t, err) - pool := &cephv1.PoolSpec{ + pool := cephv1.PoolSpec{ FailureDomain: "datacenter", CrushRoot: cephv1.DefaultCRUSHRoot, Replicated: cephv1.ReplicatedSpec{ @@ -40,19 +40,19 @@ func TestBuildStretchClusterCrushRule(t *testing.T) { }, } - rule := buildTwoStepCrushRule(crushMap, "stretched", *pool) + rule := buildTwoStepCrushRule(crushMap, "stretched", pool) assert.Equal(t, 2, rule.ID) } func TestBuildCrushSteps(t *testing.T) { - pool := &cephv1.PoolSpec{ + pool := cephv1.PoolSpec{ FailureDomain: "datacenter", CrushRoot: cephv1.DefaultCRUSHRoot, Replicated: cephv1.ReplicatedSpec{ ReplicasPerFailureDomain: 2, }, } - steps := buildTwoStepCrushSteps(*pool) + steps := buildTwoStepCrushSteps(pool) assert.Equal(t, 4, len(steps)) assert.Equal(t, cephv1.DefaultCRUSHRoot, steps[0].ItemName) assert.Equal(t, "datacenter", steps[1].Type) diff --git a/pkg/daemon/ceph/client/mirror.go b/pkg/daemon/ceph/client/mirror.go index e992d6622d504..d49a0d0b179e6 100644 --- a/pkg/daemon/ceph/client/mirror.go +++ b/pkg/daemon/ceph/client/mirror.go @@ -104,17 +104,17 @@ func CreateRBDMirrorBootstrapPeer(context *clusterd.Context, clusterInfo *Cluste } // enablePoolMirroring turns on mirroring on that pool by specifying the mirroring type -func enablePoolMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.PoolSpec, poolName string) error { - logger.Infof("enabling mirroring type %q for pool %q", pool.Mirroring.Mode, poolName) +func enablePoolMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error { + logger.Infof("enabling mirroring type %q for pool %q", pool.Mirroring.Mode, pool.Name) // Build command - args := []string{"mirror", "pool", "enable", poolName, pool.Mirroring.Mode} + args := []string{"mirror", "pool", "enable", pool.Name, pool.Mirroring.Mode} cmd := NewRBDCommand(context, clusterInfo, args) // Run command output, err := cmd.Run() if err != nil { - return errors.Wrapf(err, "failed to enable mirroring type %q for pool %q. %s", pool.Mirroring.Mode, poolName, output) + return errors.Wrapf(err, "failed to enable mirroring type %q for pool %q. %s", pool.Mirroring.Mode, pool.Name, output) } return nil @@ -246,17 +246,17 @@ func removeSnapshotSchedule(context *clusterd.Context, clusterInfo *ClusterInfo, return nil } -func enableSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, poolSpec cephv1.PoolSpec, poolName string) error { +func enableSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error { logger.Info("resetting current snapshot schedules") // Reset any existing schedules - err := removeSnapshotSchedules(context, clusterInfo, poolSpec, poolName) + err := removeSnapshotSchedules(context, clusterInfo, pool) if err != nil { logger.Errorf("failed to remove snapshot schedules. %v", err) } // Enable all the snap schedules - for _, snapSchedule := range poolSpec.Mirroring.SnapshotSchedules { - err := enableSnapshotSchedule(context, clusterInfo, snapSchedule, poolName) + for _, snapSchedule := range pool.Mirroring.SnapshotSchedules { + err := enableSnapshotSchedule(context, clusterInfo, snapSchedule, pool.Name) if err != nil { return errors.Wrap(err, "failed to enable snapshot schedule") } @@ -266,16 +266,16 @@ func enableSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo } // removeSnapshotSchedules removes all the existing snapshot schedules -func removeSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, poolSpec cephv1.PoolSpec, poolName string) error { +func removeSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error { // Get the list of existing snapshot schedule - existingSnapshotSchedules, err := listSnapshotSchedules(context, clusterInfo, poolName) + existingSnapshotSchedules, err := listSnapshotSchedules(context, clusterInfo, pool.Name) if err != nil { return errors.Wrap(err, "failed to list snapshot schedule(s)") } // Remove each schedule for _, existingSnapshotSchedule := range existingSnapshotSchedules { - err := removeSnapshotSchedule(context, clusterInfo, existingSnapshotSchedule, poolName) + err := removeSnapshotSchedule(context, clusterInfo, existingSnapshotSchedule, pool.Name) if err != nil { return errors.Wrapf(err, "failed to remove snapshot schedule %v", existingSnapshotSchedule) } diff --git a/pkg/daemon/ceph/client/mirror_test.go b/pkg/daemon/ceph/client/mirror_test.go index a25566afac962..e4b6a529d63dc 100644 --- a/pkg/daemon/ceph/client/mirror_test.go +++ b/pkg/daemon/ceph/client/mirror_test.go @@ -58,22 +58,26 @@ func TestCreateRBDMirrorBootstrapPeer(t *testing.T) { assert.Equal(t, bootstrapPeerToken, string(token)) } func TestEnablePoolMirroring(t *testing.T) { - pool := "pool-test" - poolSpec := cephv1.PoolSpec{Mirroring: cephv1.MirroringSpec{Mode: "image"}} + pool := cephv1.NamedPoolSpec{ + Name: "pool-test", + PoolSpec: cephv1.PoolSpec{ + Mirroring: cephv1.MirroringSpec{Mode: "image"}, + }, + } executor := &exectest.MockExecutor{} executor.MockExecuteCommandWithOutput = func(command string, args ...string) (string, error) { if args[0] == "mirror" { assert.Equal(t, "pool", args[1]) assert.Equal(t, "enable", args[2]) - assert.Equal(t, pool, args[3]) - assert.Equal(t, poolSpec.Mirroring.Mode, args[4]) + assert.Equal(t, pool.Name, args[3]) + assert.Equal(t, pool.Mirroring.Mode, args[4]) return "", nil } return "", errors.New("unknown command") } context := &clusterd.Context{Executor: executor} - err := enablePoolMirroring(context, AdminTestClusterInfo("mycluster"), poolSpec, pool) + err := enablePoolMirroring(context, AdminTestClusterInfo("mycluster"), pool) assert.NoError(t, err) } @@ -279,7 +283,6 @@ func TestRemoveSnapshotSchedule(t *testing.T) { } func TestRemoveSnapshotSchedules(t *testing.T) { - pool := "pool-test" interval := "24h" startTime := "14:00:00-05:00" executor := &exectest.MockExecutor{} @@ -297,8 +300,17 @@ func TestRemoveSnapshotSchedules(t *testing.T) { } context := &clusterd.Context{Executor: executor} - poolSpec := &cephv1.PoolSpec{Mirroring: cephv1.MirroringSpec{SnapshotSchedules: []cephv1.SnapshotScheduleSpec{{Interval: interval, StartTime: startTime}}}} - err := removeSnapshotSchedules(context, AdminTestClusterInfo("mycluster"), *poolSpec, pool) + pool := cephv1.NamedPoolSpec{ + Name: "pool-test", + PoolSpec: cephv1.PoolSpec{ + Mirroring: cephv1.MirroringSpec{ + SnapshotSchedules: []cephv1.SnapshotScheduleSpec{ + {Interval: interval, StartTime: startTime}, + }, + }, + }, + } + err := removeSnapshotSchedules(context, AdminTestClusterInfo("mycluster"), pool) assert.NoError(t, err) } diff --git a/pkg/daemon/ceph/client/pool.go b/pkg/daemon/ceph/client/pool.go index e870faaed7521..2c15844af241c 100644 --- a/pkg/daemon/ceph/client/pool.go +++ b/pkg/daemon/ceph/client/pool.go @@ -154,27 +154,29 @@ func ParsePoolDetails(in []byte) (CephStoragePoolDetails, error) { return poolDetails, nil } -func CreatePoolWithProfile(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, poolName string, pool cephv1.PoolSpec, appName string) error { +func CreatePoolWithProfile(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, pool cephv1.NamedPoolSpec, appName string) error { + if pool.Name == "" { + return errors.Errorf("pool name must be specified") + } if pool.IsReplicated() { - return CreateReplicatedPoolForApp(context, clusterInfo, clusterSpec, poolName, pool, DefaultPGCount, appName) + return CreateReplicatedPoolForApp(context, clusterInfo, clusterSpec, pool, DefaultPGCount, appName) } if !pool.IsErasureCoded() { // neither a replicated or EC pool - return fmt.Errorf("pool %q type is not defined as replicated or erasure coded", poolName) + return fmt.Errorf("pool %q type is not defined as replicated or erasure coded", pool.Name) } // create a new erasure code profile for the new pool - ecProfileName := GetErasureCodeProfileForPool(poolName) - if err := CreateErasureCodeProfile(context, clusterInfo, ecProfileName, pool); err != nil { - return errors.Wrapf(err, "failed to create erasure code profile for pool %q", poolName) + ecProfileName := GetErasureCodeProfileForPool(pool.Name) + if err := CreateErasureCodeProfile(context, clusterInfo, ecProfileName, pool.PoolSpec); err != nil { + return errors.Wrapf(err, "failed to create erasure code profile for pool %q", pool.Name) } // If the pool is not a replicated pool, then the only other option is an erasure coded pool. return CreateECPoolForApp( context, clusterInfo, - poolName, ecProfileName, pool, DefaultPGCount, @@ -241,7 +243,7 @@ func givePoolAppTag(context *clusterd.Context, clusterInfo *ClusterInfo, poolNam return nil } -func setCommonPoolProperties(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.PoolSpec, poolName, appName string) error { +func setCommonPoolProperties(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec, appName string) error { if len(pool.Parameters) == 0 { pool.Parameters = make(map[string]string) } @@ -256,58 +258,58 @@ func setCommonPoolProperties(context *clusterd.Context, clusterInfo *ClusterInfo // Apply properties for propName, propValue := range pool.Parameters { - err := SetPoolProperty(context, clusterInfo, poolName, propName, propValue) + err := SetPoolProperty(context, clusterInfo, pool.Name, propName, propValue) if err != nil { - logger.Errorf("failed to set property %q to pool %q to %q. %v", propName, poolName, propValue, err) + logger.Errorf("failed to set property %q to pool %q to %q. %v", propName, pool.Name, propValue, err) } } // ensure that the newly created pool gets an application tag if appName != "" { - err := givePoolAppTag(context, clusterInfo, poolName, appName) + err := givePoolAppTag(context, clusterInfo, pool.Name, appName) if err != nil { - return errors.Wrapf(err, "failed to tag pool %q for application %q", poolName, appName) + return errors.Wrapf(err, "failed to tag pool %q for application %q", pool.Name, appName) } } // If the pool is mirrored, let's enable mirroring // we don't need to check if the pool is erasure coded or not, mirroring will still work, it will simply be slow if pool.Mirroring.Enabled { - err := enablePoolMirroring(context, clusterInfo, pool, poolName) + err := enablePoolMirroring(context, clusterInfo, pool) if err != nil { - return errors.Wrapf(err, "failed to enable mirroring for pool %q", poolName) + return errors.Wrapf(err, "failed to enable mirroring for pool %q", pool.Name) } // Schedule snapshots if pool.Mirroring.SnapshotSchedulesEnabled() && clusterInfo.CephVersion.IsAtLeastOctopus() { - err = enableSnapshotSchedules(context, clusterInfo, pool, poolName) + err = enableSnapshotSchedules(context, clusterInfo, pool) if err != nil { - return errors.Wrapf(err, "failed to enable snapshot scheduling for pool %q", poolName) + return errors.Wrapf(err, "failed to enable snapshot scheduling for pool %q", pool.Name) } } } else { if pool.Mirroring.Mode == "pool" { // Remove storage cluster peers - mirrorInfo, err := GetPoolMirroringInfo(context, clusterInfo, poolName) + mirrorInfo, err := GetPoolMirroringInfo(context, clusterInfo, pool.Name) if err != nil { - return errors.Wrapf(err, "failed to get mirroring info for the pool %q", poolName) + return errors.Wrapf(err, "failed to get mirroring info for the pool %q", pool.Name) } for _, peer := range mirrorInfo.Peers { if peer.UUID != "" { - err := removeClusterPeer(context, clusterInfo, poolName, peer.UUID) + err := removeClusterPeer(context, clusterInfo, pool.Name, peer.UUID) if err != nil { - return errors.Wrapf(err, "failed to remove cluster peer with UUID %q for the pool %q", peer.UUID, poolName) + return errors.Wrapf(err, "failed to remove cluster peer with UUID %q for the pool %q", peer.UUID, pool.Name) } } } // Disable mirroring - err = disablePoolMirroring(context, clusterInfo, poolName) + err = disablePoolMirroring(context, clusterInfo, pool.Name) if err != nil { - return errors.Wrapf(err, "failed to disable mirroring for pool %q", poolName) + return errors.Wrapf(err, "failed to disable mirroring for pool %q", pool.Name) } } else if pool.Mirroring.Mode == "image" { - logger.Warningf("manually disable mirroring on images in the pool %q", poolName) + logger.Warningf("manually disable mirroring on images in the pool %q", pool.Name) } } @@ -317,28 +319,28 @@ func setCommonPoolProperties(context *clusterd.Context, clusterInfo *ClusterInfo maxBytesQuota, err := resource.ParseQuantity(*pool.Quotas.MaxSize) if err != nil { if err == resource.ErrFormatWrong { - return errors.Wrapf(err, "maxSize quota incorrectly formatted for pool %q, valid units include k, M, G, T, P, E, Ki, Mi, Gi, Ti, Pi, Ei", poolName) + return errors.Wrapf(err, "maxSize quota incorrectly formatted for pool %q, valid units include k, M, G, T, P, E, Ki, Mi, Gi, Ti, Pi, Ei", pool.Name) } - return errors.Wrapf(err, "failed setting quota for pool %q, maxSize quota parse error", poolName) + return errors.Wrapf(err, "failed setting quota for pool %q, maxSize quota parse error", pool.Name) } // set max_bytes quota, 0 value disables quota - err = setPoolQuota(context, clusterInfo, poolName, "max_bytes", strconv.FormatInt(maxBytesQuota.Value(), 10)) + err = setPoolQuota(context, clusterInfo, pool.Name, "max_bytes", strconv.FormatInt(maxBytesQuota.Value(), 10)) if err != nil { - return errors.Wrapf(err, "failed to set max_bytes quota for pool %q", poolName) + return errors.Wrapf(err, "failed to set max_bytes quota for pool %q", pool.Name) } } else if pool.Quotas.MaxBytes != nil { // set max_bytes quota, 0 value disables quota - err := setPoolQuota(context, clusterInfo, poolName, "max_bytes", strconv.FormatUint(*pool.Quotas.MaxBytes, 10)) + err := setPoolQuota(context, clusterInfo, pool.Name, "max_bytes", strconv.FormatUint(*pool.Quotas.MaxBytes, 10)) if err != nil { - return errors.Wrapf(err, "failed to set max_bytes quota for pool %q", poolName) + return errors.Wrapf(err, "failed to set max_bytes quota for pool %q", pool.Name) } } // set max_objects quota if pool.Quotas.MaxObjects != nil { // set max_objects quota, 0 value disables quota - err := setPoolQuota(context, clusterInfo, poolName, "max_objects", strconv.FormatUint(*pool.Quotas.MaxObjects, 10)) + err := setPoolQuota(context, clusterInfo, pool.Name, "max_objects", strconv.FormatUint(*pool.Quotas.MaxObjects, 10)) if err != nil { - return errors.Wrapf(err, "failed to set max_objects quota for pool %q", poolName) + return errors.Wrapf(err, "failed to set max_objects quota for pool %q", pool.Name) } } @@ -349,33 +351,33 @@ func GetErasureCodeProfileForPool(baseName string) string { return fmt.Sprintf("%s_ecprofile", baseName) } -func CreateECPoolForApp(context *clusterd.Context, clusterInfo *ClusterInfo, poolName, ecProfileName string, pool cephv1.PoolSpec, pgCount, appName string, enableECOverwrite bool) error { - args := []string{"osd", "pool", "create", poolName, pgCount, "erasure", ecProfileName} +func CreateECPoolForApp(context *clusterd.Context, clusterInfo *ClusterInfo, ecProfileName string, pool cephv1.NamedPoolSpec, pgCount, appName string, enableECOverwrite bool) error { + args := []string{"osd", "pool", "create", pool.Name, pgCount, "erasure", ecProfileName} output, err := NewCephCommand(context, clusterInfo, args).Run() if err != nil { - return errors.Wrapf(err, "failed to create EC pool %s. %s", poolName, string(output)) + return errors.Wrapf(err, "failed to create EC pool %s. %s", pool.Name, string(output)) } if enableECOverwrite { - if err = SetPoolProperty(context, clusterInfo, poolName, "allow_ec_overwrites", "true"); err != nil { - return errors.Wrapf(err, "failed to allow EC overwrite for pool %s", poolName) + if err = SetPoolProperty(context, clusterInfo, pool.Name, "allow_ec_overwrites", "true"); err != nil { + return errors.Wrapf(err, "failed to allow EC overwrite for pool %s", pool.Name) } } - if err = setCommonPoolProperties(context, clusterInfo, pool, poolName, appName); err != nil { + if err = setCommonPoolProperties(context, clusterInfo, pool, appName); err != nil { return err } - logger.Infof("creating EC pool %s succeeded", poolName) + logger.Infof("creating EC pool %s succeeded", pool.Name) return nil } -func CreateReplicatedPoolForApp(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, poolName string, pool cephv1.PoolSpec, pgCount, appName string) error { +func CreateReplicatedPoolForApp(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, pool cephv1.NamedPoolSpec, pgCount, appName string) error { // If it's a replicated pool, ensure the failure domain is desired checkFailureDomain := false // The crush rule name is the same as the pool unless we have a stretch cluster. - crushRuleName := poolName + crushRuleName := pool.Name if clusterSpec.IsStretchCluster() { // A stretch cluster enforces using the same crush rule for all pools. // The stretch cluster rule is created initially by the operator when the stretch cluster is configured @@ -383,14 +385,14 @@ func CreateReplicatedPoolForApp(context *clusterd.Context, clusterInfo *ClusterI crushRuleName = defaultStretchCrushRuleName } else if pool.IsHybridStoragePool() { // Create hybrid crush rule - err := createHybridCrushRule(context, clusterInfo, clusterSpec, crushRuleName, pool) + err := createHybridCrushRule(context, clusterInfo, clusterSpec, crushRuleName, pool.PoolSpec) if err != nil { return errors.Wrapf(err, "failed to create hybrid crush rule %q", crushRuleName) } } else { if pool.Replicated.ReplicasPerFailureDomain > 1 { // Create a two-step CRUSH rule for pools other than stretch clusters - err := createStretchCrushRule(context, clusterInfo, clusterSpec, crushRuleName, pool) + err := createStretchCrushRule(context, clusterInfo, clusterSpec, crushRuleName, pool.PoolSpec) if err != nil { return errors.Wrapf(err, "failed to create two-step crush rule %q", crushRuleName) } @@ -403,46 +405,46 @@ func CreateReplicatedPoolForApp(context *clusterd.Context, clusterInfo *ClusterI } } - args := []string{"osd", "pool", "create", poolName, pgCount, "replicated", crushRuleName, "--size", strconv.FormatUint(uint64(pool.Replicated.Size), 10)} + args := []string{"osd", "pool", "create", pool.Name, pgCount, "replicated", crushRuleName, "--size", strconv.FormatUint(uint64(pool.Replicated.Size), 10)} output, err := NewCephCommand(context, clusterInfo, args).Run() if err != nil { - return errors.Wrapf(err, "failed to create replicated pool %s. %s", poolName, string(output)) + return errors.Wrapf(err, "failed to create replicated pool %s. %s", pool.Name, string(output)) } if !clusterSpec.IsStretchCluster() { // the pool is type replicated, set the size for the pool now that it's been created // Only set the size if not 0, otherwise ceph will fail to set size to 0 if pool.Replicated.Size > 0 { - if err := SetPoolReplicatedSizeProperty(context, clusterInfo, poolName, strconv.FormatUint(uint64(pool.Replicated.Size), 10)); err != nil { - return errors.Wrapf(err, "failed to set size property to replicated pool %q to %d", poolName, pool.Replicated.Size) + if err := SetPoolReplicatedSizeProperty(context, clusterInfo, pool.Name, strconv.FormatUint(uint64(pool.Replicated.Size), 10)); err != nil { + return errors.Wrapf(err, "failed to set size property to replicated pool %q to %d", pool.Name, pool.Replicated.Size) } } } - if err = setCommonPoolProperties(context, clusterInfo, pool, poolName, appName); err != nil { + if err = setCommonPoolProperties(context, clusterInfo, pool, appName); err != nil { return err } - logger.Infof("creating replicated pool %s succeeded", poolName) + logger.Infof("creating replicated pool %s succeeded", pool.Name) if checkFailureDomain { - if err = ensureFailureDomain(context, clusterInfo, clusterSpec, poolName, pool); err != nil { + if err = ensureFailureDomain(context, clusterInfo, clusterSpec, pool); err != nil { return nil } } return nil } -func ensureFailureDomain(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, poolName string, pool cephv1.PoolSpec) error { +func ensureFailureDomain(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, pool cephv1.NamedPoolSpec) error { if pool.FailureDomain == "" { - logger.Debugf("skipping check for failure domain on pool %q as it is not specified", poolName) + logger.Debugf("skipping check for failure domain on pool %q as it is not specified", pool.Name) return nil } - logger.Debugf("checking that pool %q has the failure domain %q", poolName, pool.FailureDomain) - details, err := GetPoolDetails(context, clusterInfo, poolName) + logger.Debugf("checking that pool %q has the failure domain %q", pool.Name, pool.FailureDomain) + details, err := GetPoolDetails(context, clusterInfo, pool.Name) if err != nil { - return errors.Wrapf(err, "failed to get pool %q details", poolName) + return errors.Wrapf(err, "failed to get pool %q details", pool.Name) } // Find the failure domain for the current crush rule @@ -452,7 +454,7 @@ func ensureFailureDomain(context *clusterd.Context, clusterInfo *ClusterInfo, cl } currentFailureDomain := extractFailureDomain(rule) if currentFailureDomain == pool.FailureDomain { - logger.Debugf("pool %q has the expected failure domain %q", poolName, pool.FailureDomain) + logger.Debugf("pool %q has the expected failure domain %q", pool.Name, pool.FailureDomain) return nil } if currentFailureDomain == "" { @@ -460,9 +462,9 @@ func ensureFailureDomain(context *clusterd.Context, clusterInfo *ClusterInfo, cl } // Use a crush rule name that is unique to the desired failure domain - crushRuleName := fmt.Sprintf("%s_%s", poolName, pool.FailureDomain) - logger.Infof("updating pool %q failure domain from %q to %q with new crush rule %q", poolName, currentFailureDomain, pool.FailureDomain, crushRuleName) - logger.Infof("crush rule %q will no longer be used by pool %q", details.CrushRule, poolName) + crushRuleName := fmt.Sprintf("%s_%s", pool.Name, pool.FailureDomain) + logger.Infof("updating pool %q failure domain from %q to %q with new crush rule %q", pool.Name, currentFailureDomain, pool.FailureDomain, crushRuleName) + logger.Infof("crush rule %q will no longer be used by pool %q", details.CrushRule, pool.Name) // Create a new crush rule for the expected failure domain if err := createReplicationCrushRule(context, clusterInfo, clusterSpec, crushRuleName, pool); err != nil { @@ -470,11 +472,11 @@ func ensureFailureDomain(context *clusterd.Context, clusterInfo *ClusterInfo, cl } // Update the crush rule on the pool - if err := setCrushRule(context, clusterInfo, poolName, crushRuleName); err != nil { - return errors.Wrapf(err, "failed to set crush rule on pool %q", poolName) + if err := setCrushRule(context, clusterInfo, pool.Name, crushRuleName); err != nil { + return errors.Wrapf(err, "failed to set crush rule on pool %q", pool.Name) } - logger.Infof("Successfully updated pool %q failure domain to %q", poolName, pool.FailureDomain) + logger.Infof("Successfully updated pool %q failure domain to %q", pool.Name, pool.FailureDomain) return nil } @@ -631,7 +633,7 @@ func updateCrushMap(context *clusterd.Context, clusterInfo *ClusterInfo, ruleset return nil } -func createReplicationCrushRule(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, ruleName string, pool cephv1.PoolSpec) error { +func createReplicationCrushRule(context *clusterd.Context, clusterInfo *ClusterInfo, clusterSpec *cephv1.ClusterSpec, ruleName string, pool cephv1.NamedPoolSpec) error { failureDomain := pool.FailureDomain if failureDomain == "" { failureDomain = cephv1.DefaultFailureDomain diff --git a/pkg/daemon/ceph/client/pool_test.go b/pkg/daemon/ceph/client/pool_test.go index f20b554d24487..696e079704422 100644 --- a/pkg/daemon/ceph/client/pool_test.go +++ b/pkg/daemon/ceph/client/pool_test.go @@ -43,11 +43,13 @@ func TestCreateECPoolWithCompression(t *testing.T) { } func testCreateECPool(t *testing.T, overwrite bool, compressionMode string) { - poolName := "mypool" compressionModeCreated := false - p := cephv1.PoolSpec{ - FailureDomain: "host", - ErasureCoded: cephv1.ErasureCodedSpec{}, + p := cephv1.NamedPoolSpec{ + Name: "mypool", + PoolSpec: cephv1.PoolSpec{ + FailureDomain: "host", + ErasureCoded: cephv1.ErasureCodedSpec{}, + }, } if compressionMode != "" { p.CompressionMode = compressionMode @@ -86,7 +88,7 @@ func testCreateECPool(t *testing.T, overwrite bool, compressionMode string) { return "", errors.Errorf("unexpected ceph command %q", args) } - err := CreateECPoolForApp(context, AdminTestClusterInfo("mycluster"), poolName, "mypoolprofile", p, DefaultPGCount, "myapp", overwrite) + err := CreateECPoolForApp(context, AdminTestClusterInfo("mycluster"), "mypoolprofile", p, DefaultPGCount, "myapp", overwrite) assert.Nil(t, err) if compressionMode != "" { assert.True(t, compressionModeCreated) @@ -166,15 +168,18 @@ func testCreateReplicaPool(t *testing.T, failureDomain, crushRoot, deviceClass, return "", errors.Errorf("unexpected ceph command %q", args) } - p := cephv1.PoolSpec{ - FailureDomain: failureDomain, CrushRoot: crushRoot, DeviceClass: deviceClass, - Replicated: cephv1.ReplicatedSpec{Size: 12345}, + p := cephv1.NamedPoolSpec{ + Name: "mypool", + PoolSpec: cephv1.PoolSpec{ + FailureDomain: failureDomain, CrushRoot: crushRoot, DeviceClass: deviceClass, + Replicated: cephv1.ReplicatedSpec{Size: 12345}, + }, } if compressionMode != "" { p.CompressionMode = compressionMode } clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{Config: map[string]string{CrushRootConfigKey: "cluster-crush-root"}}} - err := CreateReplicatedPoolForApp(context, AdminTestClusterInfo("mycluster"), clusterSpec, "mypool", p, DefaultPGCount, "myapp") + err := CreateReplicatedPoolForApp(context, AdminTestClusterInfo("mycluster"), clusterSpec, p, DefaultPGCount, "myapp") assert.Nil(t, err) assert.True(t, crushRuleCreated) if compressionMode != "" { @@ -214,33 +219,42 @@ func TestUpdateFailureDomain(t *testing.T) { } t.Run("no desired failure domain", func(t *testing.T) { - p := cephv1.PoolSpec{ - Replicated: cephv1.ReplicatedSpec{Size: 3}, + p := cephv1.NamedPoolSpec{ + Name: "mypool", + PoolSpec: cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{Size: 3}, + }, } clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{}} - err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, "mypool", p) + err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, p) assert.NoError(t, err) assert.Equal(t, "", newCrushRule) }) t.Run("same failure domain", func(t *testing.T) { - p := cephv1.PoolSpec{ - FailureDomain: currentFailureDomain, - Replicated: cephv1.ReplicatedSpec{Size: 3}, + p := cephv1.NamedPoolSpec{ + Name: "mypool", + PoolSpec: cephv1.PoolSpec{ + FailureDomain: currentFailureDomain, + Replicated: cephv1.ReplicatedSpec{Size: 3}, + }, } clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{}} - err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, "mypool", p) + err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, p) assert.NoError(t, err) assert.Equal(t, "", newCrushRule) }) t.Run("changing failure domain", func(t *testing.T) { - p := cephv1.PoolSpec{ - FailureDomain: "zone", - Replicated: cephv1.ReplicatedSpec{Size: 3}, + p := cephv1.NamedPoolSpec{ + Name: "mypool", + PoolSpec: cephv1.PoolSpec{ + FailureDomain: "zone", + Replicated: cephv1.ReplicatedSpec{Size: 3}, + }, } clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{}} - err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, "mypool", p) + err := ensureFailureDomain(context, AdminTestClusterInfo("mycluster"), clusterSpec, p) assert.NoError(t, err) assert.Equal(t, "mypool_zone", newCrushRule) }) @@ -407,17 +421,19 @@ func TestCreatePoolWithReplicasPerFailureDomain(t *testing.T) { } func testCreatePoolWithReplicasPerFailureDomain(t *testing.T, failureDomain, crushRoot, deviceClass string) { - poolName := "mypool-with-two-step-clush-rule" poolRuleCreated := false poolRuleSet := false poolAppEnable := false - poolSpec := cephv1.PoolSpec{ - FailureDomain: failureDomain, - CrushRoot: crushRoot, - DeviceClass: deviceClass, - Replicated: cephv1.ReplicatedSpec{ - Size: 12345678, - ReplicasPerFailureDomain: 2, + poolSpec := cephv1.NamedPoolSpec{ + Name: "mypool-with-two-step-clush-rule", + PoolSpec: cephv1.PoolSpec{ + FailureDomain: failureDomain, + CrushRoot: crushRoot, + DeviceClass: deviceClass, + Replicated: cephv1.ReplicatedSpec{ + Size: 12345678, + ReplicasPerFailureDomain: 2, + }, }, } @@ -432,16 +448,16 @@ func testCreatePoolWithReplicasPerFailureDomain(t *testing.T, failureDomain, cru if len(args) >= 3 && args[1] == "pool" && args[2] == "create" { // Currently, CRUSH-rule name equals pool's name assert.GreaterOrEqual(t, len(args), 7) - assert.Equal(t, args[3], poolName) + assert.Equal(t, args[3], poolSpec.Name) assert.Equal(t, args[5], "replicated") crushRuleName := args[6] - assert.Equal(t, crushRuleName, poolName) + assert.Equal(t, crushRuleName, poolSpec.Name) poolRuleCreated = true return "", nil } if len(args) >= 3 && args[1] == "pool" && args[2] == "set" { crushRuleName := args[3] - assert.Equal(t, crushRuleName, poolName) + assert.Equal(t, crushRuleName, poolSpec.Name) assert.Equal(t, args[4], "size") poolSize, err := strconv.Atoi(args[5]) assert.NoError(t, err) @@ -451,13 +467,13 @@ func testCreatePoolWithReplicasPerFailureDomain(t *testing.T, failureDomain, cru } if len(args) >= 4 && args[1] == "pool" && args[2] == "application" && args[3] == "enable" { crushRuleName := args[4] - assert.Equal(t, crushRuleName, poolName) + assert.Equal(t, crushRuleName, poolSpec.Name) poolAppEnable = true return "", nil } if len(args) >= 4 && args[1] == "crush" && args[2] == "rule" && args[3] == "create-replicated" { crushRuleName := args[4] - assert.Equal(t, crushRuleName, poolName) + assert.Equal(t, crushRuleName, poolSpec.Name) deviceClassName := args[7] assert.Equal(t, deviceClassName, deviceClass) poolRuleCreated = true @@ -467,7 +483,7 @@ func testCreatePoolWithReplicasPerFailureDomain(t *testing.T, failureDomain, cru } context := &clusterd.Context{Executor: executor} clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{Config: map[string]string{CrushRootConfigKey: "cluster-crush-root"}}} - err := CreateReplicatedPoolForApp(context, AdminTestClusterInfo("mycluster"), clusterSpec, poolName, poolSpec, DefaultPGCount, "myapp") + err := CreateReplicatedPoolForApp(context, AdminTestClusterInfo("mycluster"), clusterSpec, poolSpec, DefaultPGCount, "myapp") assert.Nil(t, err) assert.True(t, poolRuleCreated) assert.True(t, poolRuleSet) diff --git a/pkg/operator/ceph/controller/predicate_test.go b/pkg/operator/ceph/controller/predicate_test.go index eb1a5ab4f0cdc..11e136a634c39 100644 --- a/pkg/operator/ceph/controller/predicate_test.go +++ b/pkg/operator/ceph/controller/predicate_test.go @@ -42,9 +42,11 @@ func TestObjectChanged(t *testing.T) { Name: name, Namespace: namespace, }, - Spec: cephv1.PoolSpec{ - Replicated: cephv1.ReplicatedSpec{ - Size: oldReplicas, + Spec: cephv1.NamedPoolSpec{ + PoolSpec: cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{ + Size: oldReplicas, + }, }, }, Status: &cephv1.CephBlockPoolStatus{ @@ -57,9 +59,11 @@ func TestObjectChanged(t *testing.T) { Name: name, Namespace: namespace, }, - Spec: cephv1.PoolSpec{ - Replicated: cephv1.ReplicatedSpec{ - Size: oldReplicas, + Spec: cephv1.NamedPoolSpec{ + PoolSpec: cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{ + Size: oldReplicas, + }, }, }, Status: &cephv1.CephBlockPoolStatus{ diff --git a/pkg/operator/ceph/csi/peermap/config_test.go b/pkg/operator/ceph/csi/peermap/config_test.go index 24fea9686745f..f93e4eaac227f 100644 --- a/pkg/operator/ceph/csi/peermap/config_test.go +++ b/pkg/operator/ceph/csi/peermap/config_test.go @@ -204,11 +204,13 @@ var fakeSinglePeerCephBlockPool = cephv1.CephBlockPool{ Name: "mirrorPool1", Namespace: ns, }, - Spec: cephv1.PoolSpec{ - Mirroring: cephv1.MirroringSpec{ - Peers: &cephv1.MirroringPeerSpec{ - SecretNames: []string{ - "peer1Secret", + Spec: cephv1.NamedPoolSpec{ + PoolSpec: cephv1.PoolSpec{ + Mirroring: cephv1.MirroringSpec{ + Peers: &cephv1.MirroringPeerSpec{ + SecretNames: []string{ + "peer1Secret", + }, }, }, }, @@ -220,13 +222,15 @@ var fakeMultiPeerCephBlockPool = cephv1.CephBlockPool{ Name: "mirrorPool1", Namespace: ns, }, - Spec: cephv1.PoolSpec{ - Mirroring: cephv1.MirroringSpec{ - Peers: &cephv1.MirroringPeerSpec{ - SecretNames: []string{ - "peer1Secret", - "peer2Secret", - "peer3Secret", + Spec: cephv1.NamedPoolSpec{ + PoolSpec: cephv1.PoolSpec{ + Mirroring: cephv1.MirroringSpec{ + Peers: &cephv1.MirroringPeerSpec{ + SecretNames: []string{ + "peer1Secret", + "peer2Secret", + "peer3Secret", + }, }, }, }, diff --git a/pkg/operator/ceph/disruption/clusterdisruption/pools.go b/pkg/operator/ceph/disruption/clusterdisruption/pools.go index 41f1d6988ee6c..6cf055bd90d20 100644 --- a/pkg/operator/ceph/disruption/clusterdisruption/pools.go +++ b/pkg/operator/ceph/disruption/clusterdisruption/pools.go @@ -46,7 +46,7 @@ func (r *ReconcileClusterDisruption) processPools(request reconcile.Request) (*c } poolCount += len(cephBlockPoolList.Items) for _, cephBlockPool := range cephBlockPoolList.Items { - poolSpecs = append(poolSpecs, cephBlockPool.Spec) + poolSpecs = append(poolSpecs, cephBlockPool.Spec.PoolSpec) } cephFilesystemList := &cephv1.CephFilesystemList{} diff --git a/pkg/operator/ceph/file/filesystem.go b/pkg/operator/ceph/file/filesystem.go index 2430c9b08f323..f725f77be8ea2 100644 --- a/pkg/operator/ceph/file/filesystem.go +++ b/pkg/operator/ceph/file/filesystem.go @@ -151,21 +151,27 @@ func newFS(name, namespace string) *Filesystem { } } -// SetPoolSize function sets the sizes for MetadataPool and dataPool -func SetPoolSize(f *Filesystem, context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, clusterSpec *cephv1.ClusterSpec, spec cephv1.FilesystemSpec) error { +// createOrUpdatePools function sets the sizes for MetadataPool and dataPool +func createOrUpdatePools(f *Filesystem, context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, clusterSpec *cephv1.ClusterSpec, spec cephv1.FilesystemSpec) error { // generating the metadata pool's name - metadataPoolName := generateMetaDataPoolName(f) - err := cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, metadataPoolName, spec.MetadataPool, "") + metadataPool := cephv1.NamedPoolSpec{ + Name: generateMetaDataPoolName(f), + PoolSpec: spec.MetadataPool, + } + err := cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, metadataPool, "") if err != nil { - return errors.Wrapf(err, "failed to update metadata pool %q", metadataPoolName) + return errors.Wrapf(err, "failed to update metadata pool %q", metadataPool.Name) } // generating the data pool's name dataPoolNames := generateDataPoolNames(f, spec) for i, pool := range spec.DataPools { - poolName := dataPoolNames[i] - err := cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, poolName, pool, "") + dataPool := cephv1.NamedPoolSpec{ + Name: dataPoolNames[i], + PoolSpec: pool, + } + err := cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, dataPool, "") if err != nil { - return errors.Wrapf(err, "failed to update datapool %q", poolName) + return errors.Wrapf(err, "failed to update datapool %q", dataPool.Name) } } return nil @@ -183,7 +189,7 @@ func (f *Filesystem) updateFilesystem(context *clusterd.Context, clusterInfo *ce ) } - if err := SetPoolSize(f, context, clusterInfo, clusterSpec, spec); err != nil { + if err := createOrUpdatePools(f, context, clusterInfo, clusterSpec, spec); err != nil { return errors.Wrap(err, "failed to set pools size") } @@ -231,25 +237,31 @@ func (f *Filesystem) doFilesystemCreate(context *clusterd.Context, clusterInfo * reversedPoolMap[value] = key } - metadataPoolName := generateMetaDataPoolName(f) - if _, poolFound := reversedPoolMap[metadataPoolName]; !poolFound { - err = cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, metadataPoolName, spec.MetadataPool, "") + metadataPool := cephv1.NamedPoolSpec{ + Name: generateMetaDataPoolName(f), + PoolSpec: spec.MetadataPool, + } + if _, poolFound := reversedPoolMap[metadataPool.Name]; !poolFound { + err = cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, metadataPool, "") if err != nil { - return errors.Wrapf(err, "failed to create metadata pool %q", metadataPoolName) + return errors.Wrapf(err, "failed to create metadata pool %q", metadataPool.Name) } } dataPoolNames := generateDataPoolNames(f, spec) for i, pool := range spec.DataPools { - poolName := dataPoolNames[i] - if _, poolFound := reversedPoolMap[poolName]; !poolFound { - err = cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, poolName, pool, "") + dataPool := cephv1.NamedPoolSpec{ + Name: dataPoolNames[i], + PoolSpec: pool, + } + if _, poolFound := reversedPoolMap[dataPool.Name]; !poolFound { + err = cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, dataPool, "") if err != nil { - return errors.Wrapf(err, "failed to create data pool %q", poolName) + return errors.Wrapf(err, "failed to create data pool %q", dataPool.Name) } if pool.IsErasureCoded() { // An erasure coded data pool used for a filesystem must allow overwrites - if err := cephclient.SetPoolProperty(context, clusterInfo, poolName, "allow_ec_overwrites", "true"); err != nil { + if err := cephclient.SetPoolProperty(context, clusterInfo, dataPool.Name, "allow_ec_overwrites", "true"); err != nil { logger.Warningf("failed to set ec pool property. %v", err) } } @@ -258,11 +270,11 @@ func (f *Filesystem) doFilesystemCreate(context *clusterd.Context, clusterInfo * // create the filesystem ('fs new' needs to be forced in order to reuse pre-existing pools) // if only one pool is created new it won't work (to avoid inconsistencies). - if err := cephclient.CreateFilesystem(context, clusterInfo, f.Name, metadataPoolName, dataPoolNames); err != nil { + if err := cephclient.CreateFilesystem(context, clusterInfo, f.Name, metadataPool.Name, dataPoolNames); err != nil { return err } - logger.Infof("created filesystem %q on %d data pool(s) and metadata pool %q", f.Name, len(dataPoolNames), metadataPoolName) + logger.Infof("created filesystem %q on %d data pool(s) and metadata pool %q", f.Name, len(dataPoolNames), metadataPool.Name) return nil } diff --git a/pkg/operator/ceph/nfs/controller.go b/pkg/operator/ceph/nfs/controller.go index 66118feb2489c..ebcbb2d2cb344 100644 --- a/pkg/operator/ceph/nfs/controller.go +++ b/pkg/operator/ceph/nfs/controller.go @@ -279,12 +279,6 @@ func (r *ReconcileCephNFS) reconcile(request reconcile.Request) (reconcile.Resul return reconcile.Result{}, errors.Wrapf(err, "invalid ceph nfs %q arguments", cephNFS.Name) } - // Always create the default pool - err = r.createDefaultNFSRADOSPool(cephNFS) - if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to create default pool %q", cephNFS.Spec.RADOS.Pool) - } - // CREATE/UPDATE logger.Debug("reconciling ceph nfs deployments") _, err = r.reconcileCreateCephNFS(cephNFS) diff --git a/pkg/operator/ceph/nfs/nfs.go b/pkg/operator/ceph/nfs/nfs.go index e68968e3952b8..85a2478ffa88e 100644 --- a/pkg/operator/ceph/nfs/nfs.go +++ b/pkg/operator/ceph/nfs/nfs.go @@ -289,22 +289,3 @@ func validateGanesha(context *clusterd.Context, clusterInfo *cephclient.ClusterI return nil } - -// create and enable default RADOS pool -func (r *ReconcileCephNFS) createDefaultNFSRADOSPool(n *cephv1.CephNFS) error { - poolName := n.Spec.RADOS.Pool - - args := []string{"osd", "pool", "create", poolName} - output, err := cephclient.NewCephCommand(r.context, r.clusterInfo, args).Run() - if err != nil { - return errors.Wrapf(err, "failed to create default NFS pool %q. %s", poolName, string(output)) - } - - args = []string{"osd", "pool", "application", "enable", poolName, "nfs", "--yes-i-really-mean-it"} - _, err = cephclient.NewCephCommand(r.context, r.clusterInfo, args).Run() - if err != nil { - return errors.Wrapf(err, "failed to enable application 'nfs' on pool %q", poolName) - } - - return nil -} diff --git a/pkg/operator/ceph/object/objectstore.go b/pkg/operator/ceph/object/objectstore.go index 2958242ccb5b7..d5f270e213745 100644 --- a/pkg/operator/ceph/object/objectstore.go +++ b/pkg/operator/ceph/object/objectstore.go @@ -782,22 +782,25 @@ func createSimilarPools(ctx *Context, pools []string, clusterSpec *cephv1.Cluste return nil } -func createRGWPool(ctx *Context, clusterSpec *cephv1.ClusterSpec, poolSpec cephv1.PoolSpec, pgCount, ecProfileName, pool string) error { +func createRGWPool(ctx *Context, clusterSpec *cephv1.ClusterSpec, poolSpec cephv1.PoolSpec, pgCount, ecProfileName, requestedName string) error { // create the pool if it doesn't exist yet - name := poolName(ctx.Name, pool) - if poolDetails, err := cephclient.GetPoolDetails(ctx.Context, ctx.clusterInfo, name); err != nil { + pool := cephv1.NamedPoolSpec{ + Name: poolName(ctx.Name, requestedName), + PoolSpec: poolSpec, + } + if poolDetails, err := cephclient.GetPoolDetails(ctx.Context, ctx.clusterInfo, pool.Name); err != nil { // If the ceph config has an EC profile, an EC pool must be created. Otherwise, it's necessary // to create a replicated pool. var err error if poolSpec.IsErasureCoded() { // An EC pool backing an object store does not need to enable EC overwrites, so the pool is // created with that property disabled to avoid unnecessary performance impact. - err = cephclient.CreateECPoolForApp(ctx.Context, ctx.clusterInfo, name, ecProfileName, poolSpec, pgCount, AppName, false /* enableECOverwrite */) + err = cephclient.CreateECPoolForApp(ctx.Context, ctx.clusterInfo, ecProfileName, pool, pgCount, AppName, false /* enableECOverwrite */) } else { - err = cephclient.CreateReplicatedPoolForApp(ctx.Context, ctx.clusterInfo, clusterSpec, name, poolSpec, pgCount, AppName) + err = cephclient.CreateReplicatedPoolForApp(ctx.Context, ctx.clusterInfo, clusterSpec, pool, pgCount, AppName) } if err != nil { - return errors.Wrapf(err, "failed to create pool %s for object store %s.", name, ctx.Name) + return errors.Wrapf(err, "failed to create pool %s for object store %s.", pool.Name, ctx.Name) } } else { // pools already exist @@ -813,8 +816,8 @@ func createRGWPool(ctx *Context, clusterSpec *cephv1.ClusterSpec, poolSpec cephv } // Set the pg_num_min if not the default so the autoscaler won't immediately increase the pg count if pgCount != cephclient.DefaultPGCount { - if err := cephclient.SetPoolProperty(ctx.Context, ctx.clusterInfo, name, "pg_num_min", pgCount); err != nil { - return errors.Wrapf(err, "failed to set pg_num_min on pool %q to %q", name, pgCount) + if err := cephclient.SetPoolProperty(ctx.Context, ctx.clusterInfo, pool.Name, "pg_num_min", pgCount); err != nil { + return errors.Wrapf(err, "failed to set pg_num_min on pool %q to %q", pool.Name, pgCount) } } diff --git a/pkg/operator/ceph/pool/controller.go b/pkg/operator/ceph/pool/controller.go index a5c253f01da57..1ec3b6527523d 100644 --- a/pkg/operator/ceph/pool/controller.go +++ b/pkg/operator/ceph/pool/controller.go @@ -158,6 +158,12 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed to get CephBlockPool") } + cephPoolName, err := getCephName(cephBlockPool) + if err != nil { + return opcontroller.ImmediateRetryResult, errors.Wrap(err, "unexpected pool name") + } + cephBlockPool.Spec.Name = cephPoolName + // Set a finalizer so we can do cleanup before the object goes away err = opcontroller.AddFinalizerIfNotPresent(r.opManagerContext, r.client, cephBlockPool) if err != nil { @@ -221,7 +227,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile r.cancelMirrorMonitoring(cephBlockPool) logger.Infof("deleting pool %q", cephBlockPool.Name) - err := deletePool(r.context, clusterInfo, cephBlockPool) + err := deletePool(r.context, clusterInfo, &cephBlockPool.Spec) if err != nil { return opcontroller.ImmediateRetryResult, errors.Wrapf(err, "failed to delete pool %q. ", cephBlockPool.Name) } @@ -242,7 +248,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile } // validate the pool settings - if err := ValidatePool(r.context, clusterInfo, &cephCluster.Spec, cephBlockPool); err != nil { + if err := validatePool(r.context, clusterInfo, &cephCluster.Spec, cephBlockPool); err != nil { if strings.Contains(err.Error(), opcontroller.UninitializedCephConfigError) { logger.Info(opcontroller.OperatorNotInitializedMessage) return opcontroller.WaitForRequeueIfOperatorNotInitialized, nil @@ -273,7 +279,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile return reconcile.Result{}, errors.Wrap(err, "failed to enable/disable stats collection for pool(s)") } - checker := newMirrorChecker(r.context, r.client, r.clusterInfo, request.NamespacedName, &cephBlockPool.Spec, cephBlockPool.Name) + checker := newMirrorChecker(r.context, r.client, r.clusterInfo, request.NamespacedName, &cephBlockPool.Spec) // ADD PEERS logger.Debug("reconciling create rbd mirror peer configuration") if cephBlockPool.Spec.Mirroring.Enabled { @@ -332,7 +338,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile } func (r *ReconcileCephBlockPool) reconcileCreatePool(clusterInfo *cephclient.ClusterInfo, cephCluster *cephv1.ClusterSpec, cephBlockPool *cephv1.CephBlockPool) (reconcile.Result, error) { - err := createPool(r.context, clusterInfo, cephCluster, cephBlockPool) + err := createPool(r.context, clusterInfo, cephCluster, &cephBlockPool.Spec) if err != nil { return opcontroller.ImmediateRetryResult, errors.Wrapf(err, "failed to create pool %q.", cephBlockPool.GetName()) } @@ -341,11 +347,31 @@ func (r *ReconcileCephBlockPool) reconcileCreatePool(clusterInfo *cephclient.Clu return reconcile.Result{}, nil } +func getCephName(pool *cephv1.CephBlockPool) (string, error) { + if pool.Name == "" { + return "", errors.Errorf("metadata name must be set") + } + // If the name is not overridden in the pool spec.name, set it to the name of the pool CR + if pool.Spec.Name == "" { + return pool.Name, nil + } + if pool.Spec.Name == pool.Name { + // The same pool name is set, nothing is being overridden + return pool.Name, nil + } + // Only allow the name to be overridden for certain built-in pool names + if pool.Spec.Name == "device_health_metrics" || pool.Spec.Name == ".nfs" { + return pool.Spec.Name, nil + } + + return "", errors.Errorf("not allowed to override the pool name except for the device_health_metrics and .nfs pools") +} + // Create the pool -func createPool(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, clusterSpec *cephv1.ClusterSpec, p *cephv1.CephBlockPool) error { +func createPool(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, clusterSpec *cephv1.ClusterSpec, p *cephv1.NamedPoolSpec) error { // create the pool - logger.Infof("creating pool %q in namespace %q", p.Name, p.Namespace) - if err := cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, p.Name, p.Spec, poolApplicationNameRBD); err != nil { + logger.Infof("creating pool %q in namespace %q", p.Name, clusterInfo.Namespace) + if err := cephclient.CreatePoolWithProfile(context, clusterInfo, clusterSpec, *p, poolApplicationNameRBD); err != nil { return errors.Wrapf(err, "failed to create pool %q", p.Name) } @@ -361,7 +387,7 @@ func createPool(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, } // Delete the pool -func deletePool(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, p *cephv1.CephBlockPool) error { +func deletePool(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, p *cephv1.NamedPoolSpec) error { pools, err := cephclient.ListPoolSummaries(context, clusterInfo) if err != nil { return errors.Wrap(err, "failed to list pools") @@ -391,8 +417,11 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient } for _, cephBlockPool := range cephBlockPoolList.Items { if cephBlockPool.GetDeletionTimestamp() == nil && cephBlockPool.Spec.EnableRBDStats { - // list of CephBlockPool with enableRBDStats set to true and not marked for deletion - enableStatsForPools = append(enableStatsForPools, cephBlockPool.Name) + cephPoolName, err := getCephName(&cephBlockPool) + if err == nil { + // add to list of CephBlockPool with enableRBDStats set to true and not marked for deletion + enableStatsForPools = append(enableStatsForPools, cephPoolName) + } } } logger.Debugf("RBD per-image IO statistics will be collected for pools: %v", enableStatsForPools) diff --git a/pkg/operator/ceph/pool/controller_test.go b/pkg/operator/ceph/pool/controller_test.go index 16fbcd43a8cf0..8eb702ebf6882 100644 --- a/pkg/operator/ceph/pool/controller_test.go +++ b/pkg/operator/ceph/pool/controller_test.go @@ -60,22 +60,59 @@ func TestCreatePool(t *testing.T) { } context := &clusterd.Context{Executor: executor} - p := &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} - p.Spec.Replicated.Size = 1 - p.Spec.Replicated.RequireSafeReplicaSize = false + p := &cephv1.NamedPoolSpec{Name: "mypool"} + p.Replicated.Size = 1 + p.Replicated.RequireSafeReplicaSize = false clusterSpec := &cephv1.ClusterSpec{Storage: cephv1.StorageScopeSpec{Config: map[string]string{cephclient.CrushRootConfigKey: "cluster-crush-root"}}} err := createPool(context, clusterInfo, clusterSpec, p) assert.Nil(t, err) // succeed with EC - p.Spec.Replicated.Size = 0 - p.Spec.ErasureCoded.CodingChunks = 1 - p.Spec.ErasureCoded.DataChunks = 2 + p.Replicated.Size = 0 + p.ErasureCoded.CodingChunks = 1 + p.ErasureCoded.DataChunks = 2 err = createPool(context, clusterInfo, clusterSpec, p) assert.Nil(t, err) } +func TestCephPoolName(t *testing.T) { + t.Run("metadata name must be set", func(t *testing.T) { + p := &cephv1.CephBlockPool{} + _, err := getCephName(p) + assert.Error(t, err) + }) + t.Run("spec not set", func(t *testing.T) { + p := &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + name, err := getCephName(p) + assert.NoError(t, err) + assert.Equal(t, "foo", name) + }) + t.Run("same name already set", func(t *testing.T) { + p := &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Spec: cephv1.NamedPoolSpec{Name: "foo"}} + name, err := getCephName(p) + assert.NoError(t, err) + assert.Equal(t, "foo", name) + }) + t.Run("cannot override unexpected name", func(t *testing.T) { + p := &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Spec: cephv1.NamedPoolSpec{Name: "bar"}} + _, err := getCephName(p) + assert.Error(t, err) + }) + t.Run("override device metrics", func(t *testing.T) { + p := &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "device-metrics"}, Spec: cephv1.NamedPoolSpec{Name: "device_health_metrics"}} + name, err := getCephName(p) + assert.NoError(t, err) + assert.Equal(t, "device_health_metrics", name) + }) + t.Run("override nfs", func(t *testing.T) { + p := &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "default-nfs"}, Spec: cephv1.NamedPoolSpec{Name: ".nfs"}} + name, err := getCephName(p) + assert.NoError(t, err) + assert.Equal(t, ".nfs", name) + }) +} + func TestDeletePool(t *testing.T) { failOnDelete := false clusterInfo := cephclient.AdminTestClusterInfo("mycluster") @@ -105,18 +142,18 @@ func TestDeletePool(t *testing.T) { context := &clusterd.Context{Executor: executor} // delete a pool that exists - p := &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} + p := &cephv1.NamedPoolSpec{Name: "mypool"} err := deletePool(context, clusterInfo, p) assert.Nil(t, err) // succeed even if the pool doesn't exist - p = &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "otherpool", Namespace: clusterInfo.Namespace}} + p = &cephv1.NamedPoolSpec{Name: "otherpool"} err = deletePool(context, clusterInfo, p) assert.Nil(t, err) // fail if images/snapshosts exist in the pool failOnDelete = true - p = &cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} + p = &cephv1.NamedPoolSpec{Name: "mypool"} err = deletePool(context, clusterInfo, p) assert.NotNil(t, err) } @@ -141,16 +178,18 @@ func TestCephBlockPoolController(t *testing.T) { Namespace: namespace, UID: types.UID("c47cac40-9bee-4d52-823b-ccd803ba5bfe"), }, - Spec: cephv1.PoolSpec{ - Replicated: cephv1.ReplicatedSpec{ - Size: replicas, - }, - Mirroring: cephv1.MirroringSpec{ - Peers: &cephv1.MirroringPeerSpec{}, - }, - StatusCheck: cephv1.MirrorHealthCheckSpec{ - Mirror: cephv1.HealthCheckSpec{ - Disabled: true, + Spec: cephv1.NamedPoolSpec{ + PoolSpec: cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{ + Size: replicas, + }, + Mirroring: cephv1.MirroringSpec{ + Peers: &cephv1.MirroringPeerSpec{}, + }, + StatusCheck: cephv1.MirrorHealthCheckSpec{ + Mirror: cephv1.HealthCheckSpec{ + Disabled: true, + }, }, }, }, @@ -517,9 +556,11 @@ func TestConfigureRBDStats(t *testing.T) { Name: "my-pool-without-rbd-stats", Namespace: namespace, }, - Spec: cephv1.PoolSpec{ - Replicated: cephv1.ReplicatedSpec{ - Size: 3, + Spec: cephv1.NamedPoolSpec{ + PoolSpec: cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{ + Size: 3, + }, }, }, } diff --git a/pkg/operator/ceph/pool/health.go b/pkg/operator/ceph/pool/health.go index 5d9dd473e6e5f..90731bf913e9f 100644 --- a/pkg/operator/ceph/pool/health.go +++ b/pkg/operator/ceph/pool/health.go @@ -37,12 +37,11 @@ type mirrorChecker struct { client client.Client clusterInfo *cephclient.ClusterInfo namespacedName types.NamespacedName - poolSpec *cephv1.PoolSpec - poolName string + poolSpec *cephv1.NamedPoolSpec } // newMirrorChecker creates a new HealthChecker object -func newMirrorChecker(context *clusterd.Context, client client.Client, clusterInfo *cephclient.ClusterInfo, namespacedName types.NamespacedName, poolSpec *cephv1.PoolSpec, poolName string) *mirrorChecker { +func newMirrorChecker(context *clusterd.Context, client client.Client, clusterInfo *cephclient.ClusterInfo, namespacedName types.NamespacedName, poolSpec *cephv1.NamedPoolSpec) *mirrorChecker { c := &mirrorChecker{ context: context, interval: &defaultHealthCheckInterval, @@ -50,7 +49,6 @@ func newMirrorChecker(context *clusterd.Context, client client.Client, clusterIn namespacedName: namespacedName, client: client, poolSpec: poolSpec, - poolName: poolName, } // allow overriding the check interval @@ -91,13 +89,13 @@ func (c *mirrorChecker) checkMirroring(context context.Context) { func (c *mirrorChecker) checkMirroringHealth() error { // Check mirroring status - mirrorStatus, err := cephclient.GetPoolMirroringStatus(c.context, c.clusterInfo, c.poolName) + mirrorStatus, err := cephclient.GetPoolMirroringStatus(c.context, c.clusterInfo, c.poolSpec.Name) if err != nil { c.updateStatusMirroring(nil, nil, nil, err.Error()) } // Check mirroring info - mirrorInfo, err := cephclient.GetPoolMirroringInfo(c.context, c.clusterInfo, c.poolName) + mirrorInfo, err := cephclient.GetPoolMirroringInfo(c.context, c.clusterInfo, c.poolSpec.Name) if err != nil { c.updateStatusMirroring(nil, nil, nil, err.Error()) } @@ -106,7 +104,7 @@ func (c *mirrorChecker) checkMirroringHealth() error { // snapSchedStatus := cephclient.SnapshotScheduleStatus{} snapSchedStatus := []cephv1.SnapshotSchedulesSpec{} if c.poolSpec.Mirroring.SnapshotSchedulesEnabled() { - snapSchedStatus, err = cephclient.ListSnapshotSchedulesRecursively(c.context, c.clusterInfo, c.poolName) + snapSchedStatus, err = cephclient.ListSnapshotSchedulesRecursively(c.context, c.clusterInfo, c.poolSpec.Name) if err != nil { c.updateStatusMirroring(nil, nil, nil, err.Error()) } diff --git a/pkg/operator/ceph/pool/validate.go b/pkg/operator/ceph/pool/validate.go index 44d96c113b655..9ea3857859762 100644 --- a/pkg/operator/ceph/pool/validate.go +++ b/pkg/operator/ceph/pool/validate.go @@ -25,15 +25,15 @@ import ( cephclient "github.com/rook/rook/pkg/daemon/ceph/client" ) -// ValidatePool Validate the pool arguments -func ValidatePool(context *clusterd.Context, clusterInfo *client.ClusterInfo, clusterSpec *cephv1.ClusterSpec, p *cephv1.CephBlockPool) error { +// validatePool Validate the pool arguments +func validatePool(context *clusterd.Context, clusterInfo *client.ClusterInfo, clusterSpec *cephv1.ClusterSpec, p *cephv1.CephBlockPool) error { if p.Name == "" { return errors.New("missing name") } if p.Namespace == "" { return errors.New("missing namespace") } - if err := ValidatePoolSpec(context, clusterInfo, clusterSpec, &p.Spec); err != nil { + if err := ValidatePoolSpec(context, clusterInfo, clusterSpec, &p.Spec.PoolSpec); err != nil { return err } return nil @@ -181,8 +181,7 @@ func ValidatePoolSpec(context *clusterd.Context, clusterInfo *cephclient.Cluster } // validateDeviceClasses validates the primary and secondary device classes in the HybridStorageSpec -func validateDeviceClasses(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, - p *cephv1.PoolSpec) error { +func validateDeviceClasses(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, p *cephv1.PoolSpec) error { primaryDeviceClass := p.Replicated.HybridStorage.PrimaryDeviceClass secondaryDeviceClass := p.Replicated.HybridStorage.SecondaryDeviceClass @@ -201,8 +200,7 @@ func validateDeviceClasses(context *clusterd.Context, clusterInfo *cephclient.Cl } // validateDeviceClassOSDs validates that the device class should have atleast one OSD -func validateDeviceClassOSDs(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, - deviceClassName string) error { +func validateDeviceClassOSDs(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, deviceClassName string) error { deviceClassOSDs, err := cephclient.GetDeviceClassOSDs(context, clusterInfo, deviceClassName) if err != nil { return errors.Wrapf(err, "failed to get osds for the device class %q", deviceClassName) diff --git a/pkg/operator/ceph/pool/validate_test.go b/pkg/operator/ceph/pool/validate_test.go index edef5abf6c667..31b4c389732b0 100644 --- a/pkg/operator/ceph/pool/validate_test.go +++ b/pkg/operator/ceph/pool/validate_test.go @@ -36,19 +36,19 @@ func TestValidatePool(t *testing.T) { t.Run("not specifying some replication or EC settings is fine", func(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.NoError(t, err) }) t.Run("must specify name", func(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Namespace: clusterInfo.Namespace}} - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) t.Run("must specify namespace", func(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool"}} - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) @@ -58,7 +58,7 @@ func TestValidatePool(t *testing.T) { p.Spec.Replicated.RequireSafeReplicaSize = false p.Spec.ErasureCoded.CodingChunks = 2 p.Spec.ErasureCoded.DataChunks = 3 - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) @@ -66,7 +66,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Replicated.Size = 1 p.Spec.Replicated.RequireSafeReplicaSize = false - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.NoError(t, err) }) @@ -74,7 +74,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Replicated.Size = 1 p.Spec.Replicated.RequireSafeReplicaSize = true - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) @@ -82,7 +82,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.ErasureCoded.CodingChunks = 1 p.Spec.ErasureCoded.DataChunks = 2 - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.NoError(t, err) }) @@ -91,7 +91,7 @@ func TestValidatePool(t *testing.T) { p.Spec.Replicated.Size = 1 p.Spec.Replicated.RequireSafeReplicaSize = false p.Spec.Parameters = map[string]string{"compression_mode": "foo"} - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) assert.EqualError(t, err, "failed to validate pool spec unknown compression mode \"foo\"") assert.Equal(t, "foo", p.Spec.Parameters["compression_mode"]) @@ -102,7 +102,7 @@ func TestValidatePool(t *testing.T) { p.Spec.Replicated.Size = 1 p.Spec.Replicated.RequireSafeReplicaSize = false p.Spec.Parameters = map[string]string{"compression_mode": "aggressive"} - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.NoError(t, err) }) @@ -110,7 +110,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Replicated.Size = 1 p.Spec.Replicated.ReplicasPerFailureDomain = 2 - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) @@ -118,7 +118,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Replicated.Size = 2 p.Spec.Replicated.ReplicasPerFailureDomain = 2 - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) @@ -126,7 +126,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Replicated.Size = 4 p.Spec.Replicated.ReplicasPerFailureDomain = 3 - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) @@ -134,14 +134,14 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Replicated.Size = 4 p.Spec.Replicated.ReplicasPerFailureDomain = 5 - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) t.Run("failure the sub domain does not exist", func(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Replicated.SubFailureDomain = "dummy" - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) }) @@ -150,7 +150,7 @@ func TestValidatePool(t *testing.T) { p.Spec.ErasureCoded.CodingChunks = 1 p.Spec.ErasureCoded.DataChunks = 2 p.Spec.CompressionMode = "passive" - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.NoError(t, err) }) @@ -158,7 +158,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Mirroring.Enabled = true p.Spec.Mirroring.Mode = "foo" - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) assert.EqualError(t, err, "unrecognized mirroring mode \"foo\". only 'image and 'pool' are supported") }) @@ -167,7 +167,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.Mirroring.Enabled = true p.Spec.Mirroring.Mode = "pool" - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.NoError(t, err) }) @@ -176,7 +176,7 @@ func TestValidatePool(t *testing.T) { p.Spec.Mirroring.Enabled = true p.Spec.Mirroring.Mode = "pool" p.Spec.Mirroring.SnapshotSchedules = []cephv1.SnapshotScheduleSpec{{StartTime: "14:00:00-05:00"}} - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) assert.EqualError(t, err, "schedule interval cannot be empty if start time is specified") }) @@ -186,7 +186,7 @@ func TestValidatePool(t *testing.T) { p.Spec.Mirroring.Enabled = true p.Spec.Mirroring.Mode = "pool" p.Spec.Mirroring.SnapshotSchedules = []cephv1.SnapshotScheduleSpec{{Interval: "24h"}} - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.NoError(t, err) }) @@ -194,7 +194,7 @@ func TestValidatePool(t *testing.T) { p := cephv1.CephBlockPool{ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}} p.Spec.FailureDomain = "host" p.Spec.Replicated.SubFailureDomain = "host" - err := ValidatePool(context, clusterInfo, clusterSpec, &p) + err := validatePool(context, clusterInfo, clusterSpec, &p) assert.Error(t, err) assert.EqualError(t, err, "failure and subfailure domain cannot be identical") }) @@ -215,35 +215,37 @@ func TestValidateCrushProperties(t *testing.T) { // succeed with a failure domain that exists p := &cephv1.CephBlockPool{ ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}, - Spec: cephv1.PoolSpec{ - Replicated: cephv1.ReplicatedSpec{Size: 1, RequireSafeReplicaSize: false}, + Spec: cephv1.NamedPoolSpec{ + PoolSpec: cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{Size: 1, RequireSafeReplicaSize: false}, + }, }, } clusterSpec := &cephv1.ClusterSpec{} - err := ValidatePool(context, clusterInfo, clusterSpec, p) + err := validatePool(context, clusterInfo, clusterSpec, p) assert.Nil(t, err) // fail with a failure domain that doesn't exist p.Spec.FailureDomain = "doesntexist" - err = ValidatePool(context, clusterInfo, clusterSpec, p) + err = validatePool(context, clusterInfo, clusterSpec, p) assert.NotNil(t, err) // fail with a crush root that doesn't exist p.Spec.FailureDomain = "osd" p.Spec.CrushRoot = "bad" - err = ValidatePool(context, clusterInfo, clusterSpec, p) + err = validatePool(context, clusterInfo, clusterSpec, p) assert.NotNil(t, err) // fail with a crush root that does exist p.Spec.CrushRoot = "good" - err = ValidatePool(context, clusterInfo, clusterSpec, p) + err = validatePool(context, clusterInfo, clusterSpec, p) assert.Nil(t, err) // Success replica size is 4 and replicasPerFailureDomain is 2 p.Spec.Replicated.Size = 4 p.Spec.Replicated.ReplicasPerFailureDomain = 2 - err = ValidatePool(context, clusterInfo, clusterSpec, p) + err = validatePool(context, clusterInfo, clusterSpec, p) assert.NoError(t, err) } @@ -304,16 +306,13 @@ func TestValidateDeviceClasses(t *testing.T) { return "", nil } - p := &cephv1.CephBlockPool{ - ObjectMeta: metav1.ObjectMeta{Name: "mypool", Namespace: clusterInfo.Namespace}, - Spec: cephv1.PoolSpec{ - Replicated: cephv1.ReplicatedSpec{ - HybridStorage: tc.hybridStorageSpec, - }, + p := &cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{ + HybridStorage: tc.hybridStorageSpec, }, } - err := validateDeviceClasses(context, clusterInfo, &p.Spec) + err := validateDeviceClasses(context, clusterInfo, p) if tc.isValidSpec { assert.NoError(t, err) } else {