Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: file: object: clean up health checkers for both types of deletion #9094

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 22 additions & 9 deletions pkg/operator/ceph/file/controller.go
Expand Up @@ -167,6 +167,10 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephFilesystem resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.cancelMirrorMonitoring(cephFilesystem)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -194,6 +198,9 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephFilesystem.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem)
if err != nil {
Expand Down Expand Up @@ -241,13 +248,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// If the ceph fs still in the map, we must remove it during CR deletion
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephFilesystem)
Expand Down Expand Up @@ -473,6 +474,18 @@ func (r *ReconcileCephFilesystem) reconcileAddBoostrapPeer(cephFilesystem *cephv
return nil
}

func fsChannelKeyName(cephFilesystem *cephv1.CephFilesystem) string {
return fmt.Sprintf("%s-%s", cephFilesystem.Namespace, cephFilesystem.Name)
func fsChannelKeyName(f *cephv1.CephFilesystem) string {
return types.NamespacedName{Namespace: f.Namespace, Name: f.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephFilesystem) cancelMirrorMonitoring(cephFilesystem *cephv1.CephFilesystem) {
_, fsContextsExists := r.fsContexts[fsChannelKeyName(cephFilesystem)]
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
}
112 changes: 64 additions & 48 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -165,6 +165,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephObjectStore resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.stopMonitoring(cephObjectStore)
return reconcile.Result{}, cephObjectStore, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -193,6 +197,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.stopMonitoring(cephObjectStore)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore)
if err != nil {
Expand All @@ -209,10 +216,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Initialize the channel for this object store
// This allows us to track multiple ObjectStores in the same namespace
_, ok := r.objectStoreContexts[cephObjectStore.Name]
_, ok := r.objectStoreContexts[monitoringChannelKey(cephObjectStore)]
if !ok {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
r.objectStoreContexts[cephObjectStore.Name] = &objectStoreHealth{
r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{
internalCtx: internalCtx,
internalCancel: internalCancel,
}
Expand All @@ -236,50 +243,39 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
r.clusterInfo.CephVersion = runningCephVersion
r.clusterInfo.Context = r.opManagerContext

if _, ok := r.objectStoreContexts[cephObjectStore.Name]; ok {
if r.objectStoreContexts[cephObjectStore.Name].internalCtx.Err() == nil {
// get the latest version of the object now that the health checker is stopped
Comment on lines -239 to -241
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this was ever actually necessary to stop the health checker to look for dependents. At worst, there could be an error checking that causes the reconcile to re-run, but I don't think that is possible unless we have multiple simultaneous reconciles in the future. Therefore, I remove the if/if check and un-indent the stuff below.

err := r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}

objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}

opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}

deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.objectStoreContexts[cephObjectStore.Name].internalCancel()
r.objectStoreContexts[cephObjectStore.Name].started = false

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove object store from the map
delete(r.objectStoreContexts, cephObjectStore.Name)
}
// get the latest version of the object to check dependencies
err = r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}
objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}
opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}
deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.stopMonitoring(cephObjectStore)

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore)
Expand Down Expand Up @@ -511,9 +507,15 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func monitoringChannelKey(o *cephv1.CephObjectStore) string {
return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String()
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
channelKey := monitoringChannelKey(objectstore)

// Start monitoring object store
if r.objectStoreContexts[objectstore.Name].started {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The object store used to just use the name of the store for the key, which would cause conflicts for stores of the same name in different namespaces.

if r.objectStoreContexts[channelKey].started {
logger.Info("external rgw endpoint monitoring go routine already running!")
return nil
}
Expand All @@ -524,10 +526,24 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec
}

logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx)
go rgwChecker.checkObjectStore(r.objectStoreContexts[channelKey].internalCtx)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreContexts[objectstore.Name].started = true
r.objectStoreContexts[channelKey].started = true

return nil
}

// cancel monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) {
channelKey := monitoringChannelKey(objectstore)

_, monitoringContextExists := r.objectStoreContexts[channelKey]
if monitoringContextExists {
// stop the monitoring routine
r.objectStoreContexts[channelKey].internalCancel()

// remove the monitoring routine from the map
delete(r.objectStoreContexts, channelKey)
}
}
36 changes: 26 additions & 10 deletions pkg/operator/ceph/pool/controller.go
Expand Up @@ -38,6 +38,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -147,6 +148,10 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.cancelMirrorMonitoring(cephBlockPool)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -174,6 +179,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force-deleting
r.cancelMirrorMonitoring(cephBlockPool)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephBlockPool)
if err != nil {
Expand All @@ -196,7 +204,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Initialize the channel for this pool
// This allows us to track multiple CephBlockPool in the same namespace
blockPoolChannelKey := fmt.Sprintf("%s-%s", cephBlockPool.Namespace, cephBlockPool.Name)
blockPoolChannelKey := blockPoolChannelKeyName(cephBlockPool)
_, blockPoolContextsExists := r.blockPoolContexts[blockPoolChannelKey]
if !blockPoolContextsExists {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
Expand All @@ -210,9 +218,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if !cephBlockPool.GetDeletionTimestamp().IsZero() {
// If the ceph block pool is still in the map, we must remove it during CR deletion
// We must remove it first otherwise the checker will panic since the status/info will be nil
if blockPoolContextsExists {
r.cancelMirrorMonitoring(blockPoolChannelKey)
}
r.cancelMirrorMonitoring(cephBlockPool)

logger.Infof("deleting pool %q", cephBlockPool.Name)
err := deletePool(r.context, clusterInfo, cephBlockPool)
Expand Down Expand Up @@ -314,7 +320,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Stop monitoring the mirroring status of this pool
if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started {
r.cancelMirrorMonitoring(blockPoolChannelKey)
r.cancelMirrorMonitoring(cephBlockPool)
// Reset the MirrorHealthCheckSpec
checker.updateStatusMirroring(nil, nil, nil, "")
}
Expand Down Expand Up @@ -403,10 +409,20 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient
return nil
}

func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) {
// Cancel the context to stop the go routine
r.blockPoolContexts[cephBlockPoolName].internalCancel()
Comment on lines -406 to -408
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the diff looks like I deleted this :, I extended this cancelMonitoring pattern to file and object controllers.

func blockPoolChannelKeyName(p *cephv1.CephBlockPool) string {
return types.NamespacedName{Namespace: p.Namespace, Name: p.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) {
channelKey := blockPoolChannelKeyName(cephBlockPool)

_, poolContextExists := r.blockPoolContexts[channelKey]
if poolContextExists {
// Cancel the context to stop the go routine
r.blockPoolContexts[channelKey].internalCancel()

// Remove ceph block pool from the map
delete(r.blockPoolContexts, cephBlockPoolName)
// Remove ceph block pool from the map
delete(r.blockPoolContexts, channelKey)
}
}