Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: file: object: clean up health checkers for both types of deletion (backport #9094) #9213

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 25 additions & 2 deletions pkg/operator/ceph/file/controller.go
Expand Up @@ -177,6 +177,10 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephFilesystem resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.cancelMirrorMonitoring(cephFilesystem)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -204,6 +208,9 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephFilesystem.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.client, cephFilesystem)
if err != nil {
Expand Down Expand Up @@ -254,13 +261,17 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// If the ceph fs still in the map, we must remove it during CR deletion
<<<<<<< HEAD
if fsChannelExists {
// Close the channel to stop the mirroring status
close(r.fsChannels[fsChannelKeyName(cephFilesystem)].stopChan)

// Remove ceph fs from the map
delete(r.fsChannels, fsChannelKeyName(cephFilesystem))
}
=======
r.cancelMirrorMonitoring(cephFilesystem)
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephFilesystem)
Expand Down Expand Up @@ -458,6 +469,18 @@ func (r *ReconcileCephFilesystem) reconcileAddBoostrapPeer(cephFilesystem *cephv
return nil
}

func fsChannelKeyName(cephFilesystem *cephv1.CephFilesystem) string {
return fmt.Sprintf("%s-%s", cephFilesystem.Namespace, cephFilesystem.Name)
func fsChannelKeyName(f *cephv1.CephFilesystem) string {
return types.NamespacedName{Namespace: f.Namespace, Name: f.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephFilesystem) cancelMirrorMonitoring(cephFilesystem *cephv1.CephFilesystem) {
_, fsContextsExists := r.fsContexts[fsChannelKeyName(cephFilesystem)]
if fsContextsExists {
// Cancel the context to stop the mirroring status
r.fsContexts[fsChannelKeyName(cephFilesystem)].internalCancel()

// Remove ceph fs from the map
delete(r.fsContexts, fsChannelKeyName(cephFilesystem))
}
}
90 changes: 90 additions & 0 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -184,6 +184,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephObjectStore resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.stopMonitoring(cephObjectStore)
return reconcile.Result{}, cephObjectStore, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -212,6 +216,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.stopMonitoring(cephObjectStore)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.client, cephObjectStore)
if err != nil {
Expand All @@ -228,11 +235,20 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Initialize the channel for this object store
// This allows us to track multiple ObjectStores in the same namespace
<<<<<<< HEAD
_, ok := r.objectStoreChannels[cephObjectStore.Name]
if !ok {
r.objectStoreChannels[cephObjectStore.Name] = &objectStoreHealth{
stopChan: make(chan struct{}),
monitoringRunning: false,
=======
_, ok := r.objectStoreContexts[monitoringChannelKey(cephObjectStore)]
if !ok {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{
internalCtx: internalCtx,
internalCancel: internalCancel,
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)
}
}

Expand All @@ -257,6 +273,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
if !cephObjectStore.GetDeletionTimestamp().IsZero() {
updateStatus(r.client, request.NamespacedName, cephv1.ConditionDeleting, buildStatusInfo(cephObjectStore))

<<<<<<< HEAD
if ok {
select {
case <-r.objectStoreChannels[cephObjectStore.Name].stopChan:
Expand Down Expand Up @@ -303,7 +320,49 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Remove object store from the map
delete(r.objectStoreChannels, cephObjectStore.Name)
=======
// Detect running Ceph version
runningCephVersion, err := cephclient.LeastUptodateDaemonVersion(r.context, r.clusterInfo, config.MonType)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to retrieve current ceph %q version", config.MonType)
}
r.clusterInfo.CephVersion = runningCephVersion
r.clusterInfo.Context = r.opManagerContext

// get the latest version of the object to check dependencies
err = r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)
}
objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}
opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}
deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

// Cancel the context to stop monitoring the health of the object store
r.stopMonitoring(cephObjectStore)

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephObjectStore)
Expand Down Expand Up @@ -517,9 +576,19 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func monitoringChannelKey(o *cephv1.CephObjectStore) string {
return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String()
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
channelKey := monitoringChannelKey(objectstore)

// Start monitoring object store
<<<<<<< HEAD
if r.objectStoreChannels[objectstore.Name].monitoringRunning {
=======
if r.objectStoreContexts[channelKey].started {
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)
logger.Info("external rgw endpoint monitoring go routine already running!")
return nil
}
Expand All @@ -530,10 +599,31 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec
}

logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
<<<<<<< HEAD
go rgwChecker.checkObjectStore(r.objectStoreChannels[objectstore.Name].stopChan)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreChannels[objectstore.Name].monitoringRunning = true
=======
go rgwChecker.checkObjectStore(r.objectStoreContexts[channelKey].internalCtx)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreContexts[channelKey].started = true
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)

return nil
}

// cancel monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) {
channelKey := monitoringChannelKey(objectstore)

_, monitoringContextExists := r.objectStoreContexts[channelKey]
if monitoringContextExists {
// stop the monitoring routine
r.objectStoreContexts[channelKey].internalCancel()

// remove the monitoring routine from the map
delete(r.objectStoreContexts, channelKey)
}
}
46 changes: 46 additions & 0 deletions pkg/operator/ceph/pool/controller.go
Expand Up @@ -39,6 +39,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -150,6 +151,10 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
r.cancelMirrorMonitoring(cephBlockPool)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -177,6 +182,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force-deleting
r.cancelMirrorMonitoring(cephBlockPool)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephBlockPool)
if err != nil {
Expand All @@ -199,22 +207,36 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Initialize the channel for this pool
// This allows us to track multiple CephBlockPool in the same namespace
<<<<<<< HEAD
blockPoolChannelKey := fmt.Sprintf("%s-%s", cephBlockPool.Namespace, cephBlockPool.Name)
_, poolChannelExists := r.blockPoolChannels[blockPoolChannelKey]
if !poolChannelExists {
r.blockPoolChannels[blockPoolChannelKey] = &blockPoolHealth{
stopChan: make(chan struct{}),
monitoringRunning: false,
=======
blockPoolChannelKey := blockPoolChannelKeyName(cephBlockPool)
_, blockPoolContextsExists := r.blockPoolContexts[blockPoolChannelKey]
if !blockPoolContextsExists {
internalCtx, internalCancel := context.WithCancel(r.opManagerContext)
r.blockPoolContexts[blockPoolChannelKey] = &blockPoolHealth{
internalCtx: internalCtx,
internalCancel: internalCancel,
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)
}
}

// DELETE: the CR was deleted
if !cephBlockPool.GetDeletionTimestamp().IsZero() {
// If the ceph block pool is still in the map, we must remove it during CR deletion
// We must remove it first otherwise the checker will panic since the status/info will be nil
<<<<<<< HEAD
if poolChannelExists {
r.cancelMirrorMonitoring(blockPoolChannelKey)
}
=======
r.cancelMirrorMonitoring(cephBlockPool)
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)

logger.Infof("deleting pool %q", cephBlockPool.Name)
err := deletePool(r.context, clusterInfo, cephBlockPool)
Expand Down Expand Up @@ -327,8 +349,13 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
updateStatus(r.client, request.NamespacedName, cephv1.ConditionReady, nil)

// Stop monitoring the mirroring status of this pool
<<<<<<< HEAD
if poolChannelExists && r.blockPoolChannels[blockPoolChannelKey].monitoringRunning {
r.cancelMirrorMonitoring(blockPoolChannelKey)
=======
if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started {
r.cancelMirrorMonitoring(cephBlockPool)
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)
// Reset the MirrorHealthCheckSpec
checker.updateStatusMirroring(nil, nil, nil, "")
}
Expand Down Expand Up @@ -417,10 +444,29 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient
return nil
}

<<<<<<< HEAD
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) {
// Close the channel to stop the mirroring status
close(r.blockPoolChannels[cephBlockPoolName].stopChan)

// Remove ceph block pool from the map
delete(r.blockPoolChannels, cephBlockPoolName)
=======
func blockPoolChannelKeyName(p *cephv1.CephBlockPool) string {
return types.NamespacedName{Namespace: p.Namespace, Name: p.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) {
channelKey := blockPoolChannelKeyName(cephBlockPool)

_, poolContextExists := r.blockPoolContexts[channelKey]
if poolContextExists {
// Cancel the context to stop the go routine
r.blockPoolContexts[channelKey].internalCancel()

// Remove ceph block pool from the map
delete(r.blockPoolContexts, channelKey)
}
>>>>>>> 03ba7dec6 (pool: file: object: clean up stop health checkers)
}