Skip to content

Commit

Permalink
pool: file: object: clean up stop health checkers
Browse files Browse the repository at this point in the history
Clean up the code used to stop health checkers for all controllers
(pool, file, object). Health checkers should now be stopped when
removing the finalizer for a forced deletion when the CephCluster does
not exist. This prevents leaking a running health checker for a resource
that is going to be imminently removed.

Also tidy the health checker stopping code so that it is similar for all
3 controllers. Of note, the object controller now uses namespace and
name for the object health checker, which would create a problem for
users who create a CephObjectStore with the same name in different
namespaces.

Signed-off-by: Blaine Gardner <blaine.gardner@redhat.com>
  • Loading branch information
BlaineEXE committed Nov 19, 2021
1 parent b9faa9d commit 03ba7de
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 67 deletions.
31 changes: 22 additions & 9 deletions pkg/operator/ceph/file/controller.go
Expand Up @@ -167,6 +167,10 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephFilesystem resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.cancelMirrorMonitoring(cephFilesystem)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -194,6 +198,9 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephFilesystem.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem)
if err != nil {
Expand Down Expand Up @@ -241,13 +248,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// If the ceph fs still in the map, we must remove it during CR deletion
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem)
Expand Down Expand Up @@ -473,6 +474,18 @@ func (r *ReconcileCephFilesystem) reconcileAddBoostrapPeer(cephFilesystem *cephv
return nil
}

func fsChannelKeyName(cephFilesystem *cephv1.CephFilesystem) string {
return fmt.Sprintf("%s-%s", cephFilesystem.Namespace, cephFilesystem.Name)
func fsChannelKeyName(f *cephv1.CephFilesystem) string {
return types.NamespacedName{Namespace: f.Namespace, Name: f.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephFilesystem) cancelMirrorMonitoring(cephFilesystem *cephv1.CephFilesystem) {
_, fsContextsExists := r.fsContexts[fsChannelKeyName(cephFilesystem)]
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
}
112 changes: 64 additions & 48 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -165,6 +165,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephObjectStore resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.stopMonitoring(cephObjectStore)
return reconcile.Result{}, cephObjectStore, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -193,6 +197,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.stopMonitoring(cephObjectStore)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore)
if err != nil {
Expand All @@ -209,10 +216,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Initialize the channel for this object store
// This allows us to track multiple ObjectStores in the same namespace
_, ok := r.objectStoreContexts[cephObjectStore.Name]
_, ok := r.objectStoreContexts[monitoringChannelKey(cephObjectStore)]
if !ok {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
r.objectStoreContexts[cephObjectStore.Name] = &objectStoreHealth{
r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{
internalCtx: internalCtx,
internalCancel: internalCancel,
}
Expand All @@ -236,50 +243,39 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
r.clusterInfo.CephVersion = runningCephVersion
r.clusterInfo.Context = r.opManagerContext

if _, ok := r.objectStoreContexts[cephObjectStore.Name]; ok {
if r.objectStoreContexts[cephObjectStore.Name].internalCtx.Err() == nil {
// get the latest version of the object now that the health checker is stopped
err := r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}

objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}

opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}

deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.objectStoreContexts[cephObjectStore.Name].internalCancel()
r.objectStoreContexts[cephObjectStore.Name].started = false

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove object store from the map
delete(r.objectStoreContexts, cephObjectStore.Name)
}
// get the latest version of the object to check dependencies
err = r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}
objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}
opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}
deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.stopMonitoring(cephObjectStore)

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore)
Expand Down Expand Up @@ -511,9 +507,15 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func monitoringChannelKey(o *cephv1.CephObjectStore) string {
return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String()
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
channelKey := monitoringChannelKey(objectstore)

// Start monitoring object store
if r.objectStoreContexts[objectstore.Name].started {
if r.objectStoreContexts[channelKey].started {
logger.Info("external rgw endpoint monitoring go routine already running!")
return nil
}
Expand All @@ -524,10 +526,24 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec
}

logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx)
go rgwChecker.checkObjectStore(r.objectStoreContexts[channelKey].internalCtx)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreContexts[objectstore.Name].started = true
r.objectStoreContexts[channelKey].started = true

return nil
}

// cancel monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) {
channelKey := monitoringChannelKey(objectstore)

_, monitoringContextExists := r.objectStoreContexts[channelKey]
if monitoringContextExists {
// stop the monitoring routine
r.objectStoreContexts[channelKey].internalCancel()

// remove the monitoring routine from the map
delete(r.objectStoreContexts, channelKey)
}
}
36 changes: 26 additions & 10 deletions pkg/operator/ceph/pool/controller.go
Expand Up @@ -38,6 +38,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -147,6 +148,10 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.cancelMirrorMonitoring(cephBlockPool)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -174,6 +179,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force-deleting
r.cancelMirrorMonitoring(cephBlockPool)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephBlockPool)
if err != nil {
Expand All @@ -196,7 +204,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Initialize the channel for this pool
// This allows us to track multiple CephBlockPool in the same namespace
blockPoolChannelKey := fmt.Sprintf("%s-%s", cephBlockPool.Namespace, cephBlockPool.Name)
blockPoolChannelKey := blockPoolChannelKeyName(cephBlockPool)
_, blockPoolContextsExists := r.blockPoolContexts[blockPoolChannelKey]
if !blockPoolContextsExists {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
Expand All @@ -210,9 +218,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if !cephBlockPool.GetDeletionTimestamp().IsZero() {
// If the ceph block pool is still in the map, we must remove it during CR deletion
// We must remove it first otherwise the checker will panic since the status/info will be nil
if blockPoolContextsExists {
r.cancelMirrorMonitoring(blockPoolChannelKey)
}
r.cancelMirrorMonitoring(cephBlockPool)

logger.Infof("deleting pool %q", cephBlockPool.Name)
err := deletePool(r.context, clusterInfo, cephBlockPool)
Expand Down Expand Up @@ -314,7 +320,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Stop monitoring the mirroring status of this pool
if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started {
r.cancelMirrorMonitoring(blockPoolChannelKey)
r.cancelMirrorMonitoring(cephBlockPool)
// Reset the MirrorHealthCheckSpec
checker.updateStatusMirroring(nil, nil, nil, "")
}
Expand Down Expand Up @@ -403,10 +409,20 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient
return nil
}

func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) {
// Cancel the context to stop the go routine
r.blockPoolContexts[cephBlockPoolName].internalCancel()
func blockPoolChannelKeyName(p *cephv1.CephBlockPool) string {
return types.NamespacedName{Namespace: p.Namespace, Name: p.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) {
channelKey := blockPoolChannelKeyName(cephBlockPool)

_, poolContextExists := r.blockPoolContexts[channelKey]
if poolContextExists {
// Cancel the context to stop the go routine
r.blockPoolContexts[channelKey].internalCancel()

// Remove ceph block pool from the map
delete(r.blockPoolContexts, cephBlockPoolName)
// Remove ceph block pool from the map
delete(r.blockPoolContexts, channelKey)
}
}

0 comments on commit 03ba7de

Please sign in to comment.