diff --git a/pkg/operator/ceph/file/controller.go b/pkg/operator/ceph/file/controller.go index 78344d743032..85fc15df9178 100644 --- a/pkg/operator/ceph/file/controller.go +++ b/pkg/operator/ceph/file/controller.go @@ -167,6 +167,10 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil if err != nil { if kerrors.IsNotFound(err) { logger.Debug("cephFilesystem resource not found. Ignoring since object must be deleted.") + // If there was a previous error or if a user removed this resource's finalizer, it's + // possible Rook didn't clean up the monitoring routine for this resource. Ensure the + // routine is stopped when we see the resource is gone. + r.cancelMirrorMonitoring(cephFilesystem) return reconcile.Result{}, nil } // Error reading the object - requeue the request. @@ -194,6 +198,9 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil // If not, we should wait for it to be ready // This handles the case where the operator is not ready to accept Ceph command but the cluster exists if !cephFilesystem.GetDeletionTimestamp().IsZero() && !cephClusterExists { + // don't leak the health checker routine if we are force deleting + r.cancelMirrorMonitoring(cephFilesystem) + // Remove finalizer err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem) if err != nil { @@ -241,13 +248,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil } // If the ceph fs still in the map, we must remove it during CR deletion - if fsContextsExists { - // Cancel the context to stop the mirroring status - r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel() - - // Remove ceph fs from the map - delete(r.fsContexts, fsChannelKeyName(cephFilesystem)) - } + r.cancelMirrorMonitoring(cephFilesystem) // Remove finalizer err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem) @@ -473,6 +474,18 @@ func (r *ReconcileCephFilesystem) reconcileAddBoostrapPeer(cephFilesystem *cephv return nil } -func fsChannelKeyName(cephFilesystem *cephv1.CephFilesystem) string { - return fmt.Sprintf("%s-%s", cephFilesystem.Namespace, cephFilesystem.Name) +func fsChannelKeyName(f *cephv1.CephFilesystem) string { + return types.NamespacedName{Namespace: f.Namespace, Name: f.Name}.String() +} + +// cancel mirror monitoring. This is a noop if monitoring is not running. +func (r *ReconcileCephFilesystem) cancelMirrorMonitoring(cephFilesystem *cephv1.CephFilesystem) { + _, fsContextsExists := r.fsContexts[fsChannelKeyName(cephFilesystem)] + if fsContextsExists { + // Cancel the context to stop the mirroring status + r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel() + + // Remove ceph fs from the map + delete(r.fsContexts, fsChannelKeyName(cephFilesystem)) + } } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index 633f000f1f97..7725f91d250e 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -165,6 +165,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci if err != nil { if kerrors.IsNotFound(err) { logger.Debug("cephObjectStore resource not found. Ignoring since object must be deleted.") + // If there was a previous error or if a user removed this resource's finalizer, it's + // possible Rook didn't clean up the monitoring routine for this resource. Ensure the + // routine is stopped when we see the resource is gone. + r.stopMonitoring(cephObjectStore) return reconcile.Result{}, cephObjectStore, nil } // Error reading the object - requeue the request. @@ -193,6 +197,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci // If not, we should wait for it to be ready // This handles the case where the operator is not ready to accept Ceph command but the cluster exists if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists { + // don't leak the health checker routine if we are force deleting + r.stopMonitoring(cephObjectStore) + // Remove finalizer err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore) if err != nil { @@ -209,10 +216,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci // Initialize the channel for this object store // This allows us to track multiple ObjectStores in the same namespace - _, ok := r.objectStoreContexts[cephObjectStore.Name] + _, ok := r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] if !ok { internalCtx, internalCancel := context.WithCancel(r.opManagerContext) - r.objectStoreContexts[cephObjectStore.Name] = &objectStoreHealth{ + r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{ internalCtx: internalCtx, internalCancel: internalCancel, } @@ -236,50 +243,39 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci r.clusterInfo.CephVersion = runningCephVersion r.clusterInfo.Context = r.opManagerContext - if _, ok := r.objectStoreContexts[cephObjectStore.Name]; ok { - if r.objectStoreContexts[cephObjectStore.Name].internalCtx.Err() == nil { - // get the latest version of the object now that the health checker is stopped - err := r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore) - if err != nil { - return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String()) - } - - objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore) - if err != nil { - return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context") - } - - opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec) - if err != nil { - return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context") - } - - deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx) - if err != nil { - return reconcile.Result{}, cephObjectStore, err - } - if !deps.Empty() { - err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps) - return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err - } - reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore) - - // Cancel the context to stop monitoring the health of the object store - r.objectStoreContexts[cephObjectStore.Name].internalCancel() - r.objectStoreContexts[cephObjectStore.Name].started = false - - cfg := clusterConfig{ - context: r.context, - store: cephObjectStore, - clusterSpec: r.clusterSpec, - clusterInfo: r.clusterInfo, - } - cfg.deleteStore() - - // Remove object store from the map - delete(r.objectStoreContexts, cephObjectStore.Name) - } + // get the latest version of the object to check dependencies + err = r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore) + if err != nil { + return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String()) } + objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore) + if err != nil { + return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context") + } + opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec) + if err != nil { + return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context") + } + deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx) + if err != nil { + return reconcile.Result{}, cephObjectStore, err + } + if !deps.Empty() { + err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps) + return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err + } + reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore) + + // Cancel the context to stop monitoring the health of the object store + r.stopMonitoring(cephObjectStore) + + cfg := clusterConfig{ + context: r.context, + store: cephObjectStore, + clusterSpec: r.clusterSpec, + clusterInfo: r.clusterInfo, + } + cfg.deleteStore() // Remove finalizer err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore) @@ -511,9 +507,15 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1 return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil } +func monitoringChannelKey(o *cephv1.CephObjectStore) string { + return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String() +} + func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error { + channelKey := monitoringChannelKey(objectstore) + // Start monitoring object store - if r.objectStoreContexts[objectstore.Name].started { + if r.objectStoreContexts[channelKey].started { logger.Info("external rgw endpoint monitoring go routine already running!") return nil } @@ -524,10 +526,24 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec } logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String()) - go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx) + go rgwChecker.checkObjectStore(r.objectStoreContexts[channelKey].internalCtx) // Set the monitoring flag so we don't start more than one go routine - r.objectStoreContexts[objectstore.Name].started = true + r.objectStoreContexts[channelKey].started = true return nil } + +// cancel monitoring. This is a noop if monitoring is not running. +func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) { + channelKey := monitoringChannelKey(objectstore) + + _, monitoringContextExists := r.objectStoreContexts[channelKey] + if monitoringContextExists { + // stop the monitoring routine + r.objectStoreContexts[channelKey].internalCancel() + + // remove the monitoring routine from the map + delete(r.objectStoreContexts, channelKey) + } +} diff --git a/pkg/operator/ceph/pool/controller.go b/pkg/operator/ceph/pool/controller.go index 0cb9cfcdc6cb..a5c253f01da5 100644 --- a/pkg/operator/ceph/pool/controller.go +++ b/pkg/operator/ceph/pool/controller.go @@ -38,6 +38,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -147,6 +148,10 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile if err != nil { if kerrors.IsNotFound(err) { logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.") + // If there was a previous error or if a user removed this resource's finalizer, it's + // possible Rook didn't clean up the monitoring routine for this resource. Ensure the + // routine is stopped when we see the resource is gone. + r.cancelMirrorMonitoring(cephBlockPool) return reconcile.Result{}, nil } // Error reading the object - requeue the request. @@ -174,6 +179,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // If not, we should wait for it to be ready // This handles the case where the operator is not ready to accept Ceph command but the cluster exists if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists { + // don't leak the health checker routine if we are force-deleting + r.cancelMirrorMonitoring(cephBlockPool) + // Remove finalizer err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephBlockPool) if err != nil { @@ -196,7 +204,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // Initialize the channel for this pool // This allows us to track multiple CephBlockPool in the same namespace - blockPoolChannelKey := fmt.Sprintf("%s-%s", cephBlockPool.Namespace, cephBlockPool.Name) + blockPoolChannelKey := blockPoolChannelKeyName(cephBlockPool) _, blockPoolContextsExists := r.blockPoolContexts[blockPoolChannelKey] if !blockPoolContextsExists { internalCtx, internalCancel := context.WithCancel(r.opManagerContext) @@ -210,9 +218,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile if !cephBlockPool.GetDeletionTimestamp().IsZero() { // If the ceph block pool is still in the map, we must remove it during CR deletion // We must remove it first otherwise the checker will panic since the status/info will be nil - if blockPoolContextsExists { - r.cancelMirrorMonitoring(blockPoolChannelKey) - } + r.cancelMirrorMonitoring(cephBlockPool) logger.Infof("deleting pool %q", cephBlockPool.Name) err := deletePool(r.context, clusterInfo, cephBlockPool) @@ -314,7 +320,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // Stop monitoring the mirroring status of this pool if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started { - r.cancelMirrorMonitoring(blockPoolChannelKey) + r.cancelMirrorMonitoring(cephBlockPool) // Reset the MirrorHealthCheckSpec checker.updateStatusMirroring(nil, nil, nil, "") } @@ -403,10 +409,20 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient return nil } -func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) { - // Cancel the context to stop the go routine - r.blockPoolContexts[cephBlockPoolName].internalCancel() +func blockPoolChannelKeyName(p *cephv1.CephBlockPool) string { + return types.NamespacedName{Namespace: p.Namespace, Name: p.Name}.String() +} + +// cancel mirror monitoring. This is a noop if monitoring is not running. +func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) { + channelKey := blockPoolChannelKeyName(cephBlockPool) + + _, poolContextExists := r.blockPoolContexts[channelKey] + if poolContextExists { + // Cancel the context to stop the go routine + r.blockPoolContexts[channelKey].internalCancel() - // Remove ceph block pool from the map - delete(r.blockPoolContexts, cephBlockPoolName) + // Remove ceph block pool from the map + delete(r.blockPoolContexts, channelKey) + } }