Skip to content

Commit

Permalink
Merge pull request #9041 from travisn/cluster-cleanup
Browse files Browse the repository at this point in the history
core: Treat cluster as not existing if the cleanup policy is set
  • Loading branch information
travisn committed Oct 27, 2021
2 parents f55479b + fd10d98 commit 33072bf
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/client/controller.go
Expand Up @@ -152,7 +152,7 @@ func (r *ReconcileCephClient) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/controller.go
Expand Up @@ -172,7 +172,7 @@ func (r *ReconcileCephRBDMirror) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
11 changes: 8 additions & 3 deletions pkg/operator/ceph/controller/controller_utils.go
Expand Up @@ -25,7 +25,6 @@ import (
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/exec"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -114,7 +113,7 @@ func canIgnoreHealthErrStatusInReconcile(cephCluster cephv1.CephCluster, control
}

// IsReadyToReconcile determines if a controller is ready to reconcile or not
func IsReadyToReconcile(ctx context.Context, c client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
func IsReadyToReconcile(ctx context.Context, c client.Client, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
cephClusterExists := false

// Running ceph commands won't work and the controller will keep re-queuing so I believe it's fine not to check
Expand All @@ -130,9 +129,15 @@ func IsReadyToReconcile(ctx context.Context, c client.Client, clustercontext *cl
logger.Debugf("%q: no CephCluster resource found in namespace %q", controllerName, namespacedName.Namespace)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}
cephClusterExists = true
cephCluster = clusterList.Items[0]

// If the cluster has a cleanup policy to destroy the cluster and it has been marked for deletion, treat it as if it does not exist
if cephCluster.Spec.CleanupPolicy.HasDataDirCleanPolicy() && !cephCluster.DeletionTimestamp.IsZero() {
logger.Infof("%q: CephCluster %q has a destructive cleanup policy, allowing resources to be deleted", controllerName, namespacedName)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}

cephClusterExists = true
logger.Debugf("%q: CephCluster resource %q found in namespace %q", controllerName, cephCluster.Name, namespacedName.Namespace)

// read the CR status of the cluster
Expand Down
91 changes: 91 additions & 0 deletions pkg/operator/ceph/controller/controller_utils_test.go
Expand Up @@ -17,12 +17,18 @@ limitations under the License.
package controller

import (
ctx "context"
"testing"
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/util/exec"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func CreateTestClusterFromStatusDetails(details map[string]cephv1.CephHealthMessage) cephv1.CephCluster {
Expand Down Expand Up @@ -85,3 +91,88 @@ func TestSetCephCommandsTimeout(t *testing.T) {
SetCephCommandsTimeout(map[string]string{"ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS": "1"})
assert.Equal(t, 1*time.Second, exec.CephCommandsTimeout)
}

func TestIsReadyToReconcile(t *testing.T) {
scheme := scheme.Scheme
scheme.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &cephv1.CephClusterList{})

controllerName := "testing"
clusterName := types.NamespacedName{Name: "mycluster", Namespace: "myns"}

t.Run("non-existent cephcluster", func(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("valid cephcluster", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("deleted cephcluster with no cleanup policy", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
}

objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.True(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("cephcluster with cleanup policy when not deleted", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
},
Spec: cephv1.ClusterSpec{
CleanupPolicy: cephv1.CleanupPolicySpec{
Confirmation: cephv1.DeleteDataDirOnHostsConfirmation,
},
}}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, _ := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.True(t, clusterExists)
})

t.Run("cephcluster with cleanup policy when deleted", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
Spec: cephv1.ClusterSpec{
CleanupPolicy: cephv1.CleanupPolicySpec{
Confirmation: cephv1.DeleteDataDirOnHostsConfirmation,
},
}}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, _ := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
})
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/controller.go
Expand Up @@ -185,7 +185,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteFilesystem() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/mirror/controller.go
Expand Up @@ -158,7 +158,7 @@ func (r *ReconcileFilesystemMirror) reconcile(request reconcile.Request) (reconc
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/nfs/controller.go
Expand Up @@ -168,7 +168,7 @@ func (r *ReconcileCephNFS) reconcile(request reconcile.Request) (reconcile.Resul
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteStore() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/controller.go
Expand Up @@ -184,7 +184,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteStore() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/realm/controller.go
Expand Up @@ -145,7 +145,7 @@ func (r *ReconcileObjectRealm) reconcile(request reconcile.Request) (reconcile.R
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !cephObjectRealm.GetDeletionTimestamp().IsZero() && !cephClusterExists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/user/controller.go
Expand Up @@ -159,7 +159,7 @@ func (r *ReconcileObjectStoreUser) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteUser() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/zone/controller.go
Expand Up @@ -141,7 +141,7 @@ func (r *ReconcileObjectZone) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/zonegroup/controller.go
Expand Up @@ -139,7 +139,7 @@ func (r *ReconcileObjectZoneGroup) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !cephObjectZoneGroup.GetDeletionTimestamp().IsZero() && !cephClusterExists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/pool/controller.go
Expand Up @@ -165,7 +165,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand Down

0 comments on commit 33072bf

Please sign in to comment.