Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Treat cluster as not existing if the cleanup policy is set #9041

Merged
merged 1 commit into from Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/operator/ceph/client/controller.go
Expand Up @@ -152,7 +152,7 @@ func (r *ReconcileCephClient) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/controller.go
Expand Up @@ -172,7 +172,7 @@ func (r *ReconcileCephRBDMirror) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
11 changes: 8 additions & 3 deletions pkg/operator/ceph/controller/controller_utils.go
Expand Up @@ -25,7 +25,6 @@ import (
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util/exec"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -114,7 +113,7 @@ func canIgnoreHealthErrStatusInReconcile(cephCluster cephv1.CephCluster, control
}

// IsReadyToReconcile determines if a controller is ready to reconcile or not
func IsReadyToReconcile(ctx context.Context, c client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
func IsReadyToReconcile(ctx context.Context, c client.Client, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
cephClusterExists := false

// Running ceph commands won't work and the controller will keep re-queuing so I believe it's fine not to check
Expand All @@ -130,9 +129,15 @@ func IsReadyToReconcile(ctx context.Context, c client.Client, clustercontext *cl
logger.Debugf("%q: no CephCluster resource found in namespace %q", controllerName, namespacedName.Namespace)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}
cephClusterExists = true
cephCluster = clusterList.Items[0]

// If the cluster has a cleanup policy to destroy the cluster and it has been marked for deletion, treat it as if it does not exist
if cephCluster.Spec.CleanupPolicy.HasDataDirCleanPolicy() && !cephCluster.DeletionTimestamp.IsZero() {
travisn marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof("%q: CephCluster %q has a destructive cleanup policy, allowing resources to be deleted", controllerName, namespacedName)
return cephCluster, false, cephClusterExists, WaitForRequeueIfCephClusterNotReady
}

cephClusterExists = true
logger.Debugf("%q: CephCluster resource %q found in namespace %q", controllerName, cephCluster.Name, namespacedName.Namespace)

// read the CR status of the cluster
Expand Down
91 changes: 91 additions & 0 deletions pkg/operator/ceph/controller/controller_utils_test.go
Expand Up @@ -17,12 +17,18 @@ limitations under the License.
package controller

import (
ctx "context"
"testing"
"time"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/util/exec"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func CreateTestClusterFromStatusDetails(details map[string]cephv1.CephHealthMessage) cephv1.CephCluster {
Expand Down Expand Up @@ -85,3 +91,88 @@ func TestSetCephCommandsTimeout(t *testing.T) {
SetCephCommandsTimeout(map[string]string{"ROOK_CEPH_COMMANDS_TIMEOUT_SECONDS": "1"})
assert.Equal(t, 1*time.Second, exec.CephCommandsTimeout)
}

func TestIsReadyToReconcile(t *testing.T) {
scheme := scheme.Scheme
scheme.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &cephv1.CephClusterList{})

controllerName := "testing"
clusterName := types.NamespacedName{Name: "mycluster", Namespace: "myns"}

t.Run("non-existent cephcluster", func(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("valid cephcluster", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("deleted cephcluster with no cleanup policy", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
}

objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, reconcileResult := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.True(t, clusterExists)
assert.Equal(t, WaitForRequeueIfCephClusterNotReady, reconcileResult)
})

t.Run("cephcluster with cleanup policy when not deleted", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
},
Spec: cephv1.ClusterSpec{
CleanupPolicy: cephv1.CleanupPolicySpec{
Confirmation: cephv1.DeleteDataDirOnHostsConfirmation,
},
}}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, _ := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.True(t, clusterExists)
})

t.Run("cephcluster with cleanup policy when deleted", func(t *testing.T) {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName.Name,
Namespace: clusterName.Namespace,
DeletionTimestamp: &metav1.Time{Time: time.Now()},
},
Spec: cephv1.ClusterSpec{
CleanupPolicy: cephv1.CleanupPolicySpec{
Confirmation: cephv1.DeleteDataDirOnHostsConfirmation,
},
}}
objects := []runtime.Object{cephCluster}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
c, ready, clusterExists, _ := IsReadyToReconcile(ctx.TODO(), client, clusterName, controllerName)
assert.NotNil(t, c)
assert.False(t, ready)
assert.False(t, clusterExists)
})
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/controller.go
Expand Up @@ -185,7 +185,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteFilesystem() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/mirror/controller.go
Expand Up @@ -158,7 +158,7 @@ func (r *ReconcileFilesystemMirror) reconcile(request reconcile.Request) (reconc
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/nfs/controller.go
Expand Up @@ -168,7 +168,7 @@ func (r *ReconcileCephNFS) reconcile(request reconcile.Request) (reconcile.Resul
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteStore() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/controller.go
Expand Up @@ -184,7 +184,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteStore() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/realm/controller.go
Expand Up @@ -145,7 +145,7 @@ func (r *ReconcileObjectRealm) reconcile(request reconcile.Request) (reconcile.R
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !cephObjectRealm.GetDeletionTimestamp().IsZero() && !cephClusterExists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/user/controller.go
Expand Up @@ -159,7 +159,7 @@ func (r *ReconcileObjectStoreUser) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deleteUser() function since everything is gone already
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/zone/controller.go
Expand Up @@ -141,7 +141,7 @@ func (r *ReconcileObjectZone) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/zonegroup/controller.go
Expand Up @@ -139,7 +139,7 @@ func (r *ReconcileObjectZoneGroup) reconcile(request reconcile.Request) (reconci
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !cephObjectZoneGroup.GetDeletionTimestamp().IsZero() && !cephClusterExists {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/pool/controller.go
Expand Up @@ -165,7 +165,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand Down