From cea0b631546dd2000ce14f758cb9edde18237174 Mon Sep 17 00:00:00 2001 From: Blaine Gardner Date: Wed, 3 Nov 2021 12:56:11 -0600 Subject: [PATCH] pool: file: object: clean up stop health checkers Clean up the code used to stop health checkers for all controllers (pool, file, object). Health checkers should now be stopped when removing the finalizer for a forced deletion when the CephCluster does not exist. This prevents leaking a running health checker for a resource that is going to be imminently removed. Also tidy the health checker stopping code so that it is similar for all 3 controllers. Of note, the object controller now uses namespace and name for the object health checker, which would create a problem for users who create a CephObjectStore with the same name in different namespaces. Signed-off-by: Blaine Gardner --- pkg/operator/ceph/file/controller.go | 31 +++++-- pkg/operator/ceph/object/controller.go | 112 ++++++++++++++----------- pkg/operator/ceph/pool/controller.go | 34 ++++++-- 3 files changed, 111 insertions(+), 66 deletions(-) diff --git a/pkg/operator/ceph/file/controller.go b/pkg/operator/ceph/file/controller.go index 78344d743032a..85fc15df9178c 100644 --- a/pkg/operator/ceph/file/controller.go +++ b/pkg/operator/ceph/file/controller.go @@ -167,6 +167,10 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil if err != nil { if kerrors.IsNotFound(err) { logger.Debug("cephFilesystem resource not found. Ignoring since object must be deleted.") + // If there was a previous error or if a user removed this resource's finalizer, it's + // possible Rook didn't clean up the monitoring routine for this resource. Ensure the + // routine is stopped when we see the resource is gone. + r.cancelMirrorMonitoring(cephFilesystem) return reconcile.Result{}, nil } // Error reading the object - requeue the request. @@ -194,6 +198,9 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil // If not, we should wait for it to be ready // This handles the case where the operator is not ready to accept Ceph command but the cluster exists if !cephFilesystem.GetDeletionTimestamp().IsZero() && !cephClusterExists { + // don't leak the health checker routine if we are force deleting + r.cancelMirrorMonitoring(cephFilesystem) + // Remove finalizer err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem) if err != nil { @@ -241,13 +248,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil } // If the ceph fs still in the map, we must remove it during CR deletion - if fsContextsExists { - // Cancel the context to stop the mirroring status - r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel() - - // Remove ceph fs from the map - delete(r.fsContexts, fsChannelKeyName(cephFilesystem)) - } + r.cancelMirrorMonitoring(cephFilesystem) // Remove finalizer err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem) @@ -473,6 +474,18 @@ func (r *ReconcileCephFilesystem) reconcileAddBoostrapPeer(cephFilesystem *cephv return nil } -func fsChannelKeyName(cephFilesystem *cephv1.CephFilesystem) string { - return fmt.Sprintf("%s-%s", cephFilesystem.Namespace, cephFilesystem.Name) +func fsChannelKeyName(f *cephv1.CephFilesystem) string { + return types.NamespacedName{Namespace: f.Namespace, Name: f.Name}.String() +} + +// cancel mirror monitoring. This is a noop if monitoring is not running. +func (r *ReconcileCephFilesystem) cancelMirrorMonitoring(cephFilesystem *cephv1.CephFilesystem) { + _, fsContextsExists := r.fsContexts[fsChannelKeyName(cephFilesystem)] + if fsContextsExists { + // Cancel the context to stop the mirroring status + r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel() + + // Remove ceph fs from the map + delete(r.fsContexts, fsChannelKeyName(cephFilesystem)) + } } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index 633f000f1f978..7725f91d250ed 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -165,6 +165,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci if err != nil { if kerrors.IsNotFound(err) { logger.Debug("cephObjectStore resource not found. Ignoring since object must be deleted.") + // If there was a previous error or if a user removed this resource's finalizer, it's + // possible Rook didn't clean up the monitoring routine for this resource. Ensure the + // routine is stopped when we see the resource is gone. + r.stopMonitoring(cephObjectStore) return reconcile.Result{}, cephObjectStore, nil } // Error reading the object - requeue the request. @@ -193,6 +197,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci // If not, we should wait for it to be ready // This handles the case where the operator is not ready to accept Ceph command but the cluster exists if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists { + // don't leak the health checker routine if we are force deleting + r.stopMonitoring(cephObjectStore) + // Remove finalizer err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore) if err != nil { @@ -209,10 +216,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci // Initialize the channel for this object store // This allows us to track multiple ObjectStores in the same namespace - _, ok := r.objectStoreContexts[cephObjectStore.Name] + _, ok := r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] if !ok { internalCtx, internalCancel := context.WithCancel(r.opManagerContext) - r.objectStoreContexts[cephObjectStore.Name] = &objectStoreHealth{ + r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{ internalCtx: internalCtx, internalCancel: internalCancel, } @@ -236,50 +243,39 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci r.clusterInfo.CephVersion = runningCephVersion r.clusterInfo.Context = r.opManagerContext - if _, ok := r.objectStoreContexts[cephObjectStore.Name]; ok { - if r.objectStoreContexts[cephObjectStore.Name].internalCtx.Err() == nil { - // get the latest version of the object now that the health checker is stopped - err := r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore) - if err != nil { - return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String()) - } - - objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore) - if err != nil { - return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context") - } - - opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec) - if err != nil { - return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context") - } - - deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx) - if err != nil { - return reconcile.Result{}, cephObjectStore, err - } - if !deps.Empty() { - err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps) - return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err - } - reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore) - - // Cancel the context to stop monitoring the health of the object store - r.objectStoreContexts[cephObjectStore.Name].internalCancel() - r.objectStoreContexts[cephObjectStore.Name].started = false - - cfg := clusterConfig{ - context: r.context, - store: cephObjectStore, - clusterSpec: r.clusterSpec, - clusterInfo: r.clusterInfo, - } - cfg.deleteStore() - - // Remove object store from the map - delete(r.objectStoreContexts, cephObjectStore.Name) - } + // get the latest version of the object to check dependencies + err = r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore) + if err != nil { + return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String()) } + objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore) + if err != nil { + return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context") + } + opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec) + if err != nil { + return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context") + } + deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx) + if err != nil { + return reconcile.Result{}, cephObjectStore, err + } + if !deps.Empty() { + err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps) + return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err + } + reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore) + + // Cancel the context to stop monitoring the health of the object store + r.stopMonitoring(cephObjectStore) + + cfg := clusterConfig{ + context: r.context, + store: cephObjectStore, + clusterSpec: r.clusterSpec, + clusterInfo: r.clusterInfo, + } + cfg.deleteStore() // Remove finalizer err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore) @@ -511,9 +507,15 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1 return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil } +func monitoringChannelKey(o *cephv1.CephObjectStore) string { + return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String() +} + func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error { + channelKey := monitoringChannelKey(objectstore) + // Start monitoring object store - if r.objectStoreContexts[objectstore.Name].started { + if r.objectStoreContexts[channelKey].started { logger.Info("external rgw endpoint monitoring go routine already running!") return nil } @@ -524,10 +526,24 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec } logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String()) - go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx) + go rgwChecker.checkObjectStore(r.objectStoreContexts[channelKey].internalCtx) // Set the monitoring flag so we don't start more than one go routine - r.objectStoreContexts[objectstore.Name].started = true + r.objectStoreContexts[channelKey].started = true return nil } + +// cancel monitoring. This is a noop if monitoring is not running. +func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) { + channelKey := monitoringChannelKey(objectstore) + + _, monitoringContextExists := r.objectStoreContexts[channelKey] + if monitoringContextExists { + // stop the monitoring routine + r.objectStoreContexts[channelKey].internalCancel() + + // remove the monitoring routine from the map + delete(r.objectStoreContexts, channelKey) + } +} diff --git a/pkg/operator/ceph/pool/controller.go b/pkg/operator/ceph/pool/controller.go index 0cb9cfcdc6cb0..9d614342daa25 100644 --- a/pkg/operator/ceph/pool/controller.go +++ b/pkg/operator/ceph/pool/controller.go @@ -38,6 +38,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -147,6 +148,10 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile if err != nil { if kerrors.IsNotFound(err) { logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.") + // If there was a previous error or if a user removed this resource's finalizer, it's + // possible Rook didn't clean up the monitoring routine for this resource. Ensure the + // routine is stopped when we see the resource is gone. + r.cancelMirrorMonitoring(cephBlockPool) return reconcile.Result{}, nil } // Error reading the object - requeue the request. @@ -174,6 +179,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // If not, we should wait for it to be ready // This handles the case where the operator is not ready to accept Ceph command but the cluster exists if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists { + // don't leak the health checker routine if we are force-deleting + r.cancelMirrorMonitoring(cephBlockPool) + // Remove finalizer err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephBlockPool) if err != nil { @@ -210,9 +218,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile if !cephBlockPool.GetDeletionTimestamp().IsZero() { // If the ceph block pool is still in the map, we must remove it during CR deletion // We must remove it first otherwise the checker will panic since the status/info will be nil - if blockPoolContextsExists { - r.cancelMirrorMonitoring(blockPoolChannelKey) - } + r.cancelMirrorMonitoring(cephBlockPool) logger.Infof("deleting pool %q", cephBlockPool.Name) err := deletePool(r.context, clusterInfo, cephBlockPool) @@ -314,7 +320,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // Stop monitoring the mirroring status of this pool if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started { - r.cancelMirrorMonitoring(blockPoolChannelKey) + r.cancelMirrorMonitoring(cephBlockPool) // Reset the MirrorHealthCheckSpec checker.updateStatusMirroring(nil, nil, nil, "") } @@ -403,10 +409,20 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient return nil } -func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) { - // Cancel the context to stop the go routine - r.blockPoolContexts[cephBlockPoolName].internalCancel() +func blockPoolChannelKeyName(p *cephv1.CephBlockPool) string { + return types.NamespacedName{Namespace: p.Namespace, Name: p.Name}.String() +} + +// cancel mirror monitoring. This is a noop if monitoring is not running. +func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) { + channelKey := blockPoolChannelKeyName(cephBlockPool) + + _, poolContextExists := r.blockPoolContexts[channelKey] + if poolContextExists { + // Cancel the context to stop the go routine + r.blockPoolContexts[channelKey].internalCancel() - // Remove ceph block pool from the map - delete(r.blockPoolContexts, cephBlockPoolName) + // Remove ceph block pool from the map + delete(r.blockPoolContexts, channelKey) + } }