Skip to content

Commit

Permalink
Merge pull request #9416 from rook/mergify/bp/release-1.7/pr-9094
Browse files Browse the repository at this point in the history
pool: file: object: clean up health checkers for both types of deletion (backport #9094)
  • Loading branch information
leseb committed Dec 15, 2021
2 parents 8bbfb94 + 1af302f commit 616efb9
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 68 deletions.
33 changes: 24 additions & 9 deletions pkg/operator/ceph/file/controller.go
Expand Up @@ -177,6 +177,12 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephFilesystem resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
cephFilesystem.Name = request.Name
cephFilesystem.Namespace = request.Namespace
r.cancelMirrorMonitoring(cephFilesystem)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -204,6 +210,9 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephFilesystem.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.client, cephFilesystem)
if err != nil {
Expand Down Expand Up @@ -254,13 +263,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil
}

// If the ceph fs still in the map, we must remove it during CR deletion
if fsChannelExists {
// Close the channel to stop the mirroring status
close(r.fsChannels[fsChannelKeyName(cephFilesystem)].stopChan)

// Remove ceph fs from the map
delete(r.fsChannels, fsChannelKeyName(cephFilesystem))
}
r.cancelMirrorMonitoring(cephFilesystem)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephFilesystem)
Expand Down Expand Up @@ -458,6 +461,18 @@ func (r *ReconcileCephFilesystem) reconcileAddBoostrapPeer(cephFilesystem *cephv
return nil
}

func fsChannelKeyName(cephFilesystem *cephv1.CephFilesystem) string {
return fmt.Sprintf("%s-%s", cephFilesystem.Namespace, cephFilesystem.Name)
func fsChannelKeyName(f *cephv1.CephFilesystem) string {
return types.NamespacedName{Namespace: f.Namespace, Name: f.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephFilesystem) cancelMirrorMonitoring(cephFilesystem *cephv1.CephFilesystem) {
_, fsChannelExists := r.fsChannels[fsChannelKeyName(cephFilesystem)]
if fsChannelExists {
// Close the channel to stop the mirroring status
close(r.fsChannels[fsChannelKeyName(cephFilesystem)].stopChan)

// Remove ceph fs from the map
delete(r.fsChannels, fsChannelKeyName(cephFilesystem))
}
}
113 changes: 64 additions & 49 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -184,6 +184,12 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("cephObjectStore resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
cephObjectStore.Name = request.Name
cephObjectStore.Namespace = request.Namespace
r.stopMonitoring(cephObjectStore)
return reconcile.Result{}, cephObjectStore, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -212,6 +218,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force deleting
r.stopMonitoring(cephObjectStore)

// Remove finalizer
err := opcontroller.RemoveFinalizer(r.client, cephObjectStore)
if err != nil {
Expand All @@ -228,9 +237,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Initialize the channel for this object store
// This allows us to track multiple ObjectStores in the same namespace
_, ok := r.objectStoreChannels[cephObjectStore.Name]
_, ok := r.objectStoreChannels[monitoringChannelKey(cephObjectStore)]
if !ok {
r.objectStoreChannels[cephObjectStore.Name] = &objectStoreHealth{
r.objectStoreChannels[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{
stopChan: make(chan struct{}),
monitoringRunning: false,
}
Expand All @@ -257,53 +266,39 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci
if !cephObjectStore.GetDeletionTimestamp().IsZero() {
updateStatus(r.client, request.NamespacedName, cephv1.ConditionDeleting, buildStatusInfo(cephObjectStore))

if ok {
select {
case <-r.objectStoreChannels[cephObjectStore.Name].stopChan:
// channel was closed
break
default:
// Close the channel to stop the healthcheck of the endpoint
close(r.objectStoreChannels[cephObjectStore.Name].stopChan)
}

// get the latest version of the object now that the health checker is stopped
err := r.client.Get(context.TODO(), request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}

objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}

opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}

deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)
// get the latest version of the object to check dependencies
err := r.client.Get(context.TODO(), request.NamespacedName, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String())
}
objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context")
}
opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec)
if err != nil {
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context")
}
deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx)
if err != nil {
return reconcile.Result{}, cephObjectStore, err
}
if !deps.Empty() {
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps)
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err
}
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore)

cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()
// Cancel the context to stop monitoring the health of the object store
r.stopMonitoring(cephObjectStore)

// Remove object store from the map
delete(r.objectStoreChannels, cephObjectStore.Name)
cfg := clusterConfig{
context: r.context,
store: cephObjectStore,
clusterSpec: r.clusterSpec,
clusterInfo: r.clusterInfo,
}
cfg.deleteStore()

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephObjectStore)
Expand Down Expand Up @@ -517,9 +512,15 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func monitoringChannelKey(o *cephv1.CephObjectStore) string {
return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String()
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
channelKey := monitoringChannelKey(objectstore)

// Start monitoring object store
if r.objectStoreChannels[objectstore.Name].monitoringRunning {
if r.objectStoreChannels[channelKey].monitoringRunning {
logger.Info("external rgw endpoint monitoring go routine already running!")
return nil
}
Expand All @@ -530,10 +531,24 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec
}

logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
go rgwChecker.checkObjectStore(r.objectStoreChannels[objectstore.Name].stopChan)
go rgwChecker.checkObjectStore(r.objectStoreChannels[channelKey].stopChan)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreChannels[objectstore.Name].monitoringRunning = true
r.objectStoreChannels[channelKey].monitoringRunning = true

return nil
}

// cancel monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) {
channelKey := monitoringChannelKey(objectstore)

_, monitoringContextExists := r.objectStoreChannels[channelKey]
if monitoringContextExists {
// stop the monitoring routine
close(r.objectStoreChannels[channelKey].stopChan)

// remove the monitoring routine from the map
delete(r.objectStoreChannels, channelKey)
}
}
38 changes: 28 additions & 10 deletions pkg/operator/ceph/pool/controller.go
Expand Up @@ -39,6 +39,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -150,6 +151,12 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.")
// If there was a previous error or if a user removed this resource's finalizer, it's
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the
// routine is stopped when we see the resource is gone.
cephBlockPool.Name = request.Name
cephBlockPool.Namespace = request.Namespace
r.cancelMirrorMonitoring(cephBlockPool)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -177,6 +184,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
// If not, we should wait for it to be ready
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// don't leak the health checker routine if we are force-deleting
r.cancelMirrorMonitoring(cephBlockPool)

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephBlockPool)
if err != nil {
Expand All @@ -199,7 +209,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Initialize the channel for this pool
// This allows us to track multiple CephBlockPool in the same namespace
blockPoolChannelKey := fmt.Sprintf("%s-%s", cephBlockPool.Namespace, cephBlockPool.Name)
blockPoolChannelKey := blockPoolChannelKeyName(cephBlockPool)
_, poolChannelExists := r.blockPoolChannels[blockPoolChannelKey]
if !poolChannelExists {
r.blockPoolChannels[blockPoolChannelKey] = &blockPoolHealth{
Expand All @@ -212,9 +222,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
if !cephBlockPool.GetDeletionTimestamp().IsZero() {
// If the ceph block pool is still in the map, we must remove it during CR deletion
// We must remove it first otherwise the checker will panic since the status/info will be nil
if poolChannelExists {
r.cancelMirrorMonitoring(blockPoolChannelKey)
}
r.cancelMirrorMonitoring(cephBlockPool)

logger.Infof("deleting pool %q", cephBlockPool.Name)
err := deletePool(r.context, clusterInfo, cephBlockPool)
Expand Down Expand Up @@ -328,7 +336,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile

// Stop monitoring the mirroring status of this pool
if poolChannelExists && r.blockPoolChannels[blockPoolChannelKey].monitoringRunning {
r.cancelMirrorMonitoring(blockPoolChannelKey)
r.cancelMirrorMonitoring(cephBlockPool)
// Reset the MirrorHealthCheckSpec
checker.updateStatusMirroring(nil, nil, nil, "")
}
Expand Down Expand Up @@ -417,10 +425,20 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient
return nil
}

func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) {
// Close the channel to stop the mirroring status
close(r.blockPoolChannels[cephBlockPoolName].stopChan)
func blockPoolChannelKeyName(p *cephv1.CephBlockPool) string {
return types.NamespacedName{Namespace: p.Namespace, Name: p.Name}.String()
}

// cancel mirror monitoring. This is a noop if monitoring is not running.
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) {
channelKey := blockPoolChannelKeyName(cephBlockPool)

_, poolContextExists := r.blockPoolChannels[channelKey]
if poolContextExists {
// Close the channel to stop the mirroring status
close(r.blockPoolChannels[channelKey].stopChan)

// Remove ceph block pool from the map
delete(r.blockPoolChannels, cephBlockPoolName)
// Remove ceph block pool from the map
delete(r.blockPoolChannels, channelKey)
}
}

0 comments on commit 616efb9

Please sign in to comment.