Skip to content

Commit

Permalink
Merge pull request #9094 from BlaineEXE/clean-up-resource-health-chec…
Browse files Browse the repository at this point in the history
…kers-when-removing-finalizers

pool: file: object: clean up health checkers for both types of deletion
  • Loading branch information
BlaineEXE committed Nov 19, 2021
2 parents b9faa9d + 03ba7de commit fcd0d90
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 67 deletions.
31 changes: 22 additions & 9 deletions pkg/operator/ceph/file/controller.go
Expand Up @@ -167,6 +167,10 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephFilesystem resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.cancelMirrorMonitoring(cephFilesystem)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -194,6 +198,9 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephFilesystem.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem)
if err != nil {
Expand Down Expand Up @@ -241,13 +248,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// If the ceph fs still in the map, we must remove it during CR deletion
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem)
Expand Down Expand Up @@ -473,6 +474,18 @@ func (r *ReconcileCephFilesystem) reconcileAddBoostrapPeer(cephFilesystem *cephv
return nil
}

func fsChannelKeyName(cephFilesystem *cephv1.CephFilesystem) string {
return fmt.Sprintf("%s-%s", cephFilesystem.Namespace, cephFilesystem.Name)
func fsChannelKeyName(f *cephv1.CephFilesystem) string {
return types.NamespacedName{Namespace: f.Namespace, Name: f.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephFilesystem) cancelMirrorMonitoring(cephFilesystem *cephv1.CephFilesystem) {
_, fsContextsExists := r.fsContexts[fsChannelKeyName(cephFilesystem)]
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
}
112 changes: 64 additions & 48 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -165,6 +165,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephObjectStore resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.stopMonitoring(cephObjectStore)
return reconcile.Result{}, cephObjectStore, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -193,6 +197,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.stopMonitoring(cephObjectStore)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore)
if err != nil {
Expand All @@ -209,10 +216,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Initialize the channel for this object store
// This allows us to track multiple ObjectStores in the same namespace
_, ok := r.objectStoreContexts[cephObjectStore.Name]
_, ok := r.objectStoreContexts[monitoringChannelKey(cephObjectStore)]
if !ok {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
r.objectStoreContexts[cephObjectStore.Name] = &objectStoreHealth{
r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{
internalCtx: internalCtx,
internalCancel: internalCancel,
}
Expand All @@ -236,50 +243,39 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
r.clusterInfo.CephVersion = runningCephVersion
r.clusterInfo.Context = r.opManagerContext

if _, ok := r.objectStoreContexts[cephObjectStore.Name]; ok {
if r.objectStoreContexts[cephObjectStore.Name].internalCtx.Err() == nil {
// get the latest version of the object now that the health checker is stopped
err := r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}

objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}

opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}

deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.objectStoreContexts[cephObjectStore.Name].internalCancel()
r.objectStoreContexts[cephObjectStore.Name].started = false

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove object store from the map
delete(r.objectStoreContexts, cephObjectStore.Name)
}
// get the latest version of the object to check dependencies
err = r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}
objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}
opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}
deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.stopMonitoring(cephObjectStore)

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore)
Expand Down Expand Up @@ -511,9 +507,15 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func monitoringChannelKey(o *cephv1.CephObjectStore) string {
return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String()
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
channelKey := monitoringChannelKey(objectstore)

// Start monitoring object store
if r.objectStoreContexts[objectstore.Name].started {
if r.objectStoreContexts[channelKey].started {
logger.Info("external rgw endpoint monitoring go routine already running!")
return nil
}
Expand All @@ -524,10 +526,24 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec
}

logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx)
go rgwChecker.checkObjectStore(r.objectStoreContexts[channelKey].internalCtx)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreContexts[objectstore.Name].started = true
r.objectStoreContexts[channelKey].started = true

return nil
}

// cancel monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) {
channelKey := monitoringChannelKey(objectstore)

_, monitoringContextExists := r.objectStoreContexts[channelKey]
if monitoringContextExists {
// stop the monitoring routine
r.objectStoreContexts[channelKey].internalCancel()

// remove the monitoring routine from the map
delete(r.objectStoreContexts, channelKey)
}
}
36 changes: 26 additions & 10 deletions pkg/operator/ceph/pool/controller.go
Expand Up @@ -38,6 +38,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -147,6 +148,10 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.cancelMirrorMonitoring(cephBlockPool)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -174,6 +179,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force-deleting
r.cancelMirrorMonitoring(cephBlockPool)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephBlockPool)
if err != nil {
Expand All @@ -196,7 +204,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Initialize the channel for this pool
// This allows us to track multiple CephBlockPool in the same namespace
blockPoolChannelKey := fmt.Sprintf("%s-%s", cephBlockPool.Namespace, cephBlockPool.Name)
blockPoolChannelKey := blockPoolChannelKeyName(cephBlockPool)
_, blockPoolContextsExists := r.blockPoolContexts[blockPoolChannelKey]
if !blockPoolContextsExists {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
Expand All @@ -210,9 +218,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if !cephBlockPool.GetDeletionTimestamp().IsZero() {
// If the ceph block pool is still in the map, we must remove it during CR deletion
// We must remove it first otherwise the checker will panic since the status/info will be nil
if blockPoolContextsExists {
r.cancelMirrorMonitoring(blockPoolChannelKey)
}
r.cancelMirrorMonitoring(cephBlockPool)

logger.Infof("deleting pool %q", cephBlockPool.Name)
err := deletePool(r.context, clusterInfo, cephBlockPool)
Expand Down Expand Up @@ -314,7 +320,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Stop monitoring the mirroring status of this pool
if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started {
r.cancelMirrorMonitoring(blockPoolChannelKey)
r.cancelMirrorMonitoring(cephBlockPool)
// Reset the MirrorHealthCheckSpec
checker.updateStatusMirroring(nil, nil, nil, "")
}
Expand Down Expand Up @@ -403,10 +409,20 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient
return nil
}

func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) {
// Cancel the context to stop the go routine
r.blockPoolContexts[cephBlockPoolName].internalCancel()
func blockPoolChannelKeyName(p *cephv1.CephBlockPool) string {
return types.NamespacedName{Namespace: p.Namespace, Name: p.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) {
channelKey := blockPoolChannelKeyName(cephBlockPool)

_, poolContextExists := r.blockPoolContexts[channelKey]
if poolContextExists {
// Cancel the context to stop the go routine
r.blockPoolContexts[channelKey].internalCancel()

// Remove ceph block pool from the map
delete(r.blockPoolContexts, cephBlockPoolName)
// Remove ceph block pool from the map
delete(r.blockPoolContexts, channelKey)
}
}

0 comments on commit fcd0d90

Please sign in to comment.