Skip to content

Commit

Permalink
core: treat cluster as not existing if the cleanup policy is set
Browse files Browse the repository at this point in the history
The cluster CR can be forcefully deleted and cleanup the
cluster resources if the yes-really-destroy-data policy
is set on the CR. In this case, the other controllers should
treat the cluster CR as not existing and allow the finalizers
to be removed on those resources if they are requested for
deletion.

Signed-off-by: Travis Nielsen <tnielsen@redhat.com>
  • Loading branch information
travisn committed Oct 27, 2021
1 parent c97f70b commit fd10d98
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/client/controller.go
Expand Up @@ -152,7 +152,7 @@ func (r *ReconcileCephClient) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/controller.go
Expand Up @@ -172,7 +172,7 @@ func (r *ReconcileCephRBDMirror) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
11 changes: 8 additions & 3 deletions pkg/operator/ceph/controller/controller_utils.go
Expand Up @@ -25,7 +25,6 @@ import (
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/exec"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -114,7 +113,7 @@ func canIgnoreHealthErrStatusInReconcile(cephCluster cephv1.CephCluster, control
}

// IsReadyToReconcile determines if a controller is ready to reconcile or not
func IsReadyToReconcile(ctx context.Context, c client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
func IsReadyToReconcile(ctx context.Context, c client.Client, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
cephClusterExists := false

// Running ceph commands won't work and the controller will keep re-queuing so I believe it's fine not to check
Expand All @@ -130,9 +129,15 @@ func IsReadyToReconcile(ctx context.Context, c client.Client, clustercontext *cl
logger.Debugf("%q: no CephCluster resource found in namespace %q", controllerName, namespacedName.Namespace)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}
cephClusterExists = true
cephCluster = clusterList.Items[0]

// If the cluster has a cleanup policy to destroy the cluster and it has been marked for deletion, treat it as if it does not exist
if cephCluster.Spec.CleanupPolicy.HasDataDirCleanPolicy() && !cephCluster.DeletionTimestamp.IsZero() {
logger.Infof("%q: CephCluster %q has a destructive cleanup policy, allowing resources to be deleted", controllerName, namespacedName)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}

cephClusterExists = true
logger.Debugf("%q: CephCluster resource %q found in namespace %q", controllerName, cephCluster.Name, namespacedName.Namespace)

// read the CR status of the cluster
Expand Down
91 changes: 91 additions & 0 deletions pkg/operator/ceph/controller/controller_utils_test.go
Expand Up @@ -17,12 +17,18 @@ limitations under the License.
package controller

import (
ctx "context"
"testing"
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/util/exec"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func CreateTestClusterFromStatusDetails(details map[string]cephv1.CephHealthMessage) cephv1.CephCluster {
Expand Down Expand Up @@ -85,3 +91,88 @@ func TestSetCephCommandsTimeout(t *testing.T) {
SetCephCommandsTimeout(map[string]string{"ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS": "1"})
assert.Equal(t, 1*time.Second, exec.CephCommandsTimeout)
}

func TestIsReadyToReconcile(t *testing.T) {
scheme := scheme.Scheme
scheme.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &cephv1.CephClusterList{})

controllerName := "testing"
clusterName := types.NamespacedName{Name: "mycluster", Namespace: "myns"}

t.Run("non-existent cephcluster", func(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("valid cephcluster", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("deleted cephcluster with no cleanup policy", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
}

objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.True(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("cephcluster with cleanup policy when not deleted", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
},
Spec: cephv1.ClusterSpec{
CleanupPolicy: cephv1.CleanupPolicySpec{
Confirmation: cephv1.DeleteDataDirOnHostsConfirmation,
},
}}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, _ := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.True(t, clusterExists)
})

t.Run("cephcluster with cleanup policy when deleted", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
Spec: cephv1.ClusterSpec{
CleanupPolicy: cephv1.CleanupPolicySpec{
Confirmation: cephv1.DeleteDataDirOnHostsConfirmation,
},
}}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, _ := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
})
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/controller.go
Expand Up @@ -185,7 +185,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteFilesystem() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/mirror/controller.go
Expand Up @@ -158,7 +158,7 @@ func (r *ReconcileFilesystemMirror) reconcile(request reconcile.Request) (reconc
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/nfs/controller.go
Expand Up @@ -168,7 +168,7 @@ func (r *ReconcileCephNFS) reconcile(request reconcile.Request) (reconcile.Resul
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteStore() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/controller.go
Expand Up @@ -184,7 +184,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteStore() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/realm/controller.go
Expand Up @@ -145,7 +145,7 @@ func (r *ReconcileObjectRealm) reconcile(request reconcile.Request) (reconcile.R
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !cephObjectRealm.GetDeletionTimestamp().IsZero() && !cephClusterExists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/user/controller.go
Expand Up @@ -159,7 +159,7 @@ func (r *ReconcileObjectStoreUser) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteUser() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/zone/controller.go
Expand Up @@ -141,7 +141,7 @@ func (r *ReconcileObjectZone) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/zonegroup/controller.go
Expand Up @@ -139,7 +139,7 @@ func (r *ReconcileObjectZoneGroup) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !cephObjectZoneGroup.GetDeletionTimestamp().IsZero() && !cephClusterExists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/pool/controller.go
Expand Up @@ -165,7 +165,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand Down

0 comments on commit fd10d98

Please sign in to comment.