Skip to content

Commit

Permalink
Merge pull request #9053 from rook/mergify/bp/release-1.7/pr-9041
Browse files Browse the repository at this point in the history
core: Treat cluster as not existing if the cleanup policy is set (backport #9041)
  • Loading branch information
mergify[bot] committed Oct 27, 2021
2 parents 28f15ef + 6e6da74 commit 90103f5
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/client/controller.go
Expand Up @@ -156,7 +156,7 @@ func (r *ReconcileCephClient) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/controller.go
Expand Up @@ -191,7 +191,7 @@ func (r *ReconcileCephRBDMirror) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
10 changes: 8 additions & 2 deletions pkg/operator/ceph/controller/controller_utils.go
Expand Up @@ -126,7 +126,7 @@ func canIgnoreHealthErrStatusInReconcile(cephCluster cephv1.CephCluster, control
}

// IsReadyToReconcile determines if a controller is ready to reconcile or not
func IsReadyToReconcile(c client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
func IsReadyToReconcile(c client.Client, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
cephClusterExists := false

// Running ceph commands won't work and the controller will keep re-queuing so I believe it's fine not to check
Expand All @@ -142,9 +142,15 @@ func IsReadyToReconcile(c client.Client, clustercontext *clusterd.Context, names
logger.Debugf("%q: no CephCluster resource found in namespace %q", controllerName, namespacedName.Namespace)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}
cephClusterExists = true
cephCluster = clusterList.Items[0]

// If the cluster has a cleanup policy to destroy the cluster and it has been marked for deletion, treat it as if it does not exist
if cephCluster.Spec.CleanupPolicy.HasDataDirCleanPolicy() && !cephCluster.DeletionTimestamp.IsZero() {
logger.Infof("%q: CephCluster %q has a destructive cleanup policy, allowing resources to be deleted", controllerName, namespacedName)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}

cephClusterExists = true
logger.Debugf("%q: CephCluster resource %q found in namespace %q", controllerName, cephCluster.Name, namespacedName.Namespace)

// read the CR status of the cluster
Expand Down
93 changes: 91 additions & 2 deletions pkg/operator/ceph/controller/controller_utils_test.go
Expand Up @@ -22,12 +22,16 @@ import (
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/util/exec"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kfake "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func CreateTestClusterFromStatusDetails(details map[string]cephv1.CephHealthMessage) cephv1.CephCluster {
Expand Down Expand Up @@ -79,7 +83,7 @@ func TestCanIgnoreHealthErrStatusInReconcile(t *testing.T) {
}

func TestSetCephCommandsTimeout(t *testing.T) {
clientset := fake.NewSimpleClientset()
clientset := kfake.NewSimpleClientset()
ctx := context.TODO()
cm := &v1.ConfigMap{}
cm.Name = "rook-ceph-operator-config"
Expand All @@ -104,3 +108,88 @@ func TestSetCephCommandsTimeout(t *testing.T) {
SetCephCommandsTimeout(context)
assert.Equal(t, 1*time.Second, exec.CephCommandsTimeout)
}

func TestIsReadyToReconcile(t *testing.T) {
scheme := scheme.Scheme
scheme.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &cephv1.CephClusterList{})

controllerName := "testing"
clusterName := types.NamespacedName{Name: "mycluster", Namespace: "myns"}

t.Run("non-existent cephcluster", func(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("valid cephcluster", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("deleted cephcluster with no cleanup policy", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
}

objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.True(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("cephcluster with cleanup policy when not deleted", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
},
Spec: cephv1.ClusterSpec{
CleanupPolicy: cephv1.CleanupPolicySpec{
Confirmation: cephv1.DeleteDataDirOnHostsConfirmation,
},
}}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, _ := IsReadyToReconcile(client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.True(t, clusterExists)
})

t.Run("cephcluster with cleanup policy when deleted", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
Spec: cephv1.ClusterSpec{
CleanupPolicy: cephv1.CleanupPolicySpec{
Confirmation: cephv1.DeleteDataDirOnHostsConfirmation,
},
}}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, _ := IsReadyToReconcile(client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
})
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/controller.go
Expand Up @@ -195,7 +195,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteFilesystem() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/mirror/controller.go
Expand Up @@ -177,7 +177,7 @@ func (r *ReconcileFilesystemMirror) reconcile(request reconcile.Request) (reconc
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/nfs/controller.go
Expand Up @@ -168,7 +168,7 @@ func (r *ReconcileCephNFS) reconcile(request reconcile.Request) (reconcile.Resul
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteStore() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/controller.go
Expand Up @@ -203,7 +203,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteStore() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/realm/controller.go
Expand Up @@ -149,7 +149,7 @@ func (r *ReconcileObjectRealm) reconcile(request reconcile.Request) (reconcile.R
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !cephObjectRealm.GetDeletionTimestamp().IsZero() && !cephClusterExists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/user/controller.go
Expand Up @@ -163,7 +163,7 @@ func (r *ReconcileObjectStoreUser) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteUser() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/zone/controller.go
Expand Up @@ -144,7 +144,7 @@ func (r *ReconcileObjectZone) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/zonegroup/controller.go
Expand Up @@ -142,7 +142,7 @@ func (r *ReconcileObjectZoneGroup) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !cephObjectZoneGroup.GetDeletionTimestamp().IsZero() && !cephClusterExists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/pool/controller.go
Expand Up @@ -168,7 +168,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand Down

0 comments on commit 90103f5

Please sign in to comment.