Skip to content

Commit

Permalink
pool: file: object: clean up stop health checkers
Browse files Browse the repository at this point in the history
Clean up the code used to stop health checkers for all controllers
(pool, file, object). Health checkers should now be stopped when
removing the finalizer for a forced deletion when the CephCluster does
not exist. This prevents leaking a running health checker for a resource
that is going to be imminently removed.

Also tidy the health checker stopping code so that it is similar for all
3 controllers. Of note, the object controller now uses namespace and
name for the object health checker, which would create a problem for
users who create a CephObjectStore with the same name in different
namespaces.

Signed-off-by: Blaine Gardner <blaine.gardner@redhat.com>
  • Loading branch information
BlaineEXE committed Nov 3, 2021
1 parent 01bb0c1 commit 674132f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 64 deletions.
23 changes: 16 additions & 7 deletions pkg/operator/ceph/file/controller.go
Expand Up @@ -194,6 +194,9 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephFilesystem.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem)
if err != nil {
Expand Down Expand Up @@ -241,13 +244,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// If the ceph fs still in the map, we must remove it during CR deletion
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem)
Expand Down Expand Up @@ -475,3 +472,15 @@ func (r *ReconcileCephFilesystem) reconcileAddBoostrapPeer(cephFilesystem *cephv
func fsChannelKeyName(cephFilesystem *cephv1.CephFilesystem) string {
return fmt.Sprintf("%s-%s", cephFilesystem.Namespace, cephFilesystem.Name)
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephFilesystem) cancelMirrorMonitoring(cephFilesystem *cephv1.CephFilesystem) {
_, fsContextsExists := r.fsContexts[fsChannelKeyName(cephFilesystem)]
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
}
108 changes: 60 additions & 48 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -193,6 +193,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.stopMonitoring(cephObjectStore)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore)
if err != nil {
Expand All @@ -209,10 +212,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Initialize the channel for this object store
// This allows us to track multiple ObjectStores in the same namespace
_, ok := r.objectStoreContexts[cephObjectStore.Name]
_, ok := r.objectStoreContexts[monitoringChannelKey(cephObjectStore)]
if !ok {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
r.objectStoreContexts[cephObjectStore.Name] = &objectStoreHealth{
r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{
internalCtx: internalCtx,
internalCancel: internalCancel,
}
Expand All @@ -236,50 +239,39 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
r.clusterInfo.CephVersion = runningCephVersion
r.clusterInfo.Context = r.opManagerContext

if _, ok := r.objectStoreContexts[cephObjectStore.Name]; ok {
if r.objectStoreContexts[cephObjectStore.Name].internalCtx.Err() == nil {
// get the latest version of the object now that the health checker is stopped
err := r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}

objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}

opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}

deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.objectStoreContexts[cephObjectStore.Name].internalCancel()
r.objectStoreContexts[cephObjectStore.Name].started = false

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove object store from the map
delete(r.objectStoreContexts, cephObjectStore.Name)
}
// get the latest version of the object to check dependencies
err = r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}
objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}
opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}
deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.stopMonitoring(cephObjectStore)

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore)
Expand Down Expand Up @@ -510,9 +502,15 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func monitoringChannelKey(o *cephv1.CephObjectStore) string {
return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String()
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
channelKey := monitoringChannelKey(objectstore)

// Start monitoring object store
if r.objectStoreContexts[objectstore.Name].started {
if r.objectStoreContexts[channelKey].started {
logger.Info("external rgw endpoint monitoring go routine already running!")
return nil
}
Expand All @@ -523,10 +521,24 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec
}

logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx)
go rgwChecker.checkObjectStore(r.objectStoreContexts[channelKey].internalCtx)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreContexts[objectstore.Name].started = true
r.objectStoreContexts[channelKey].started = true

return nil
}

// cancel monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) {
channelKey := monitoringChannelKey(objectstore)

_, monitoringContextExists := r.objectStoreContexts[channelKey]
if monitoringContextExists {
// stop the monitoring routine
r.objectStoreContexts[channelKey].internalCancel()

// remove the monitoring routine from the map
delete(r.objectStoreContexts, channelKey)
}
}
29 changes: 20 additions & 9 deletions pkg/operator/ceph/pool/controller.go
Expand Up @@ -174,6 +174,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force-deleting
r.cancelMirrorMonitoring(cephBlockPool)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephBlockPool)
if err != nil {
Expand Down Expand Up @@ -210,9 +213,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if !cephBlockPool.GetDeletionTimestamp().IsZero() {
// If the ceph block pool is still in the map, we must remove it during CR deletion
// We must remove it first otherwise the checker will panic since the status/info will be nil
if blockPoolContextsExists {
r.cancelMirrorMonitoring(blockPoolChannelKey)
}
r.cancelMirrorMonitoring(cephBlockPool)

logger.Infof("deleting pool %q", cephBlockPool.Name)
err := deletePool(r.context, clusterInfo, cephBlockPool)
Expand Down Expand Up @@ -314,7 +315,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Stop monitoring the mirroring status of this pool
if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started {
r.cancelMirrorMonitoring(blockPoolChannelKey)
r.cancelMirrorMonitoring(cephBlockPool)
// Reset the MirrorHealthCheckSpec
checker.updateStatusMirroring(nil, nil, nil, "")
}
Expand Down Expand Up @@ -403,10 +404,20 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient
return nil
}

func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) {
// Cancel the context to stop the go routine
r.blockPoolContexts[cephBlockPoolName].internalCancel()
func blockPoolChannelKeyName(cephBlockPool *cephv1.CephBlockPool) string {
return fmt.Sprintf("%s-%s", cephBlockPool.Namespace, cephBlockPool.Name)
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) {
channelKey := blockPoolChannelKeyName(cephBlockPool)

_, poolContextExists := r.blockPoolContexts[channelKey]
if poolContextExists {
// Cancel the context to stop the go routine
r.blockPoolContexts[channelKey].internalCancel()

// Remove ceph block pool from the map
delete(r.blockPoolContexts, cephBlockPoolName)
// Remove ceph block pool from the map
delete(r.blockPoolContexts, channelKey)
}
}

0 comments on commit 674132f

Please sign in to comment.