New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pool: file: object: clean up health checkers for both types of deletion #9094
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -165,6 +165,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci | |
if err != nil { | ||
if kerrors.IsNotFound(err) { | ||
logger.Debug("cephObjectStore resource not found. Ignoring since object must be deleted.") | ||
// If there was a previous error or if a user removed this resource's finalizer, it's | ||
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the | ||
// routine is stopped when we see the resource is gone. | ||
r.stopMonitoring(cephObjectStore) | ||
return reconcile.Result{}, cephObjectStore, nil | ||
} | ||
// Error reading the object - requeue the request. | ||
|
@@ -193,6 +197,9 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci | |
// If not, we should wait for it to be ready | ||
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists | ||
if !cephObjectStore.GetDeletionTimestamp().IsZero() && !cephClusterExists { | ||
// don't leak the health checker routine if we are force deleting | ||
r.stopMonitoring(cephObjectStore) | ||
|
||
// Remove finalizer | ||
err := opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore) | ||
if err != nil { | ||
|
@@ -209,10 +216,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci | |
|
||
// Initialize the channel for this object store | ||
// This allows us to track multiple ObjectStores in the same namespace | ||
_, ok := r.objectStoreContexts[cephObjectStore.Name] | ||
_, ok := r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] | ||
if !ok { | ||
internalCtx, internalCancel := context.WithCancel(r.opManagerContext) | ||
r.objectStoreContexts[cephObjectStore.Name] = &objectStoreHealth{ | ||
r.objectStoreContexts[monitoringChannelKey(cephObjectStore)] = &objectStoreHealth{ | ||
internalCtx: internalCtx, | ||
internalCancel: internalCancel, | ||
} | ||
|
@@ -236,50 +243,39 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci | |
r.clusterInfo.CephVersion = runningCephVersion | ||
r.clusterInfo.Context = r.opManagerContext | ||
|
||
if _, ok := r.objectStoreContexts[cephObjectStore.Name]; ok { | ||
if r.objectStoreContexts[cephObjectStore.Name].internalCtx.Err() == nil { | ||
// get the latest version of the object now that the health checker is stopped | ||
err := r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore) | ||
if err != nil { | ||
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String()) | ||
} | ||
|
||
objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore) | ||
if err != nil { | ||
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context") | ||
} | ||
|
||
opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec) | ||
if err != nil { | ||
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context") | ||
} | ||
|
||
deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx) | ||
if err != nil { | ||
return reconcile.Result{}, cephObjectStore, err | ||
} | ||
if !deps.Empty() { | ||
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps) | ||
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err | ||
} | ||
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore) | ||
|
||
// Cancel the context to stop monitoring the health of the object store | ||
r.objectStoreContexts[cephObjectStore.Name].internalCancel() | ||
r.objectStoreContexts[cephObjectStore.Name].started = false | ||
|
||
cfg := clusterConfig{ | ||
context: r.context, | ||
store: cephObjectStore, | ||
clusterSpec: r.clusterSpec, | ||
clusterInfo: r.clusterInfo, | ||
} | ||
cfg.deleteStore() | ||
|
||
// Remove object store from the map | ||
delete(r.objectStoreContexts, cephObjectStore.Name) | ||
} | ||
// get the latest version of the object to check dependencies | ||
err = r.client.Get(r.opManagerContext, request.NamespacedName, cephObjectStore) | ||
if err != nil { | ||
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to get latest CephObjectStore %q", request.NamespacedName.String()) | ||
} | ||
objCtx, err := NewMultisiteContext(r.context, r.clusterInfo, cephObjectStore) | ||
if err != nil { | ||
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get object context") | ||
} | ||
opsCtx, err := NewMultisiteAdminOpsContext(objCtx, &cephObjectStore.Spec) | ||
if err != nil { | ||
return reconcile.Result{}, cephObjectStore, errors.Wrapf(err, "failed to check for object buckets. failed to get admin ops API context") | ||
} | ||
deps, err := cephObjectStoreDependents(r.context, r.clusterInfo, cephObjectStore, objCtx, opsCtx) | ||
if err != nil { | ||
return reconcile.Result{}, cephObjectStore, err | ||
} | ||
if !deps.Empty() { | ||
err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps) | ||
return opcontroller.WaitForRequeueIfFinalizerBlocked, cephObjectStore, err | ||
} | ||
reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore) | ||
|
||
// Cancel the context to stop monitoring the health of the object store | ||
r.stopMonitoring(cephObjectStore) | ||
|
||
cfg := clusterConfig{ | ||
context: r.context, | ||
store: cephObjectStore, | ||
clusterSpec: r.clusterSpec, | ||
clusterInfo: r.clusterInfo, | ||
} | ||
cfg.deleteStore() | ||
|
||
// Remove finalizer | ||
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephObjectStore) | ||
|
@@ -511,9 +507,15 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1 | |
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil | ||
} | ||
|
||
func monitoringChannelKey(o *cephv1.CephObjectStore) string { | ||
return types.NamespacedName{Namespace: o.Namespace, Name: o.Name}.String() | ||
} | ||
|
||
func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error { | ||
channelKey := monitoringChannelKey(objectstore) | ||
|
||
// Start monitoring object store | ||
if r.objectStoreContexts[objectstore.Name].started { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The object store used to just use the name of the store for the key, which would cause conflicts for stores of the same name in different namespaces. |
||
if r.objectStoreContexts[channelKey].started { | ||
logger.Info("external rgw endpoint monitoring go routine already running!") | ||
return nil | ||
} | ||
|
@@ -524,10 +526,24 @@ func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjec | |
} | ||
|
||
logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String()) | ||
go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx) | ||
go rgwChecker.checkObjectStore(r.objectStoreContexts[channelKey].internalCtx) | ||
|
||
// Set the monitoring flag so we don't start more than one go routine | ||
r.objectStoreContexts[objectstore.Name].started = true | ||
r.objectStoreContexts[channelKey].started = true | ||
|
||
return nil | ||
} | ||
|
||
// cancel monitoring. This is a noop if monitoring is not running. | ||
func (r *ReconcileCephObjectStore) stopMonitoring(objectstore *cephv1.CephObjectStore) { | ||
channelKey := monitoringChannelKey(objectstore) | ||
|
||
_, monitoringContextExists := r.objectStoreContexts[channelKey] | ||
if monitoringContextExists { | ||
// stop the monitoring routine | ||
r.objectStoreContexts[channelKey].internalCancel() | ||
|
||
// remove the monitoring routine from the map | ||
delete(r.objectStoreContexts, channelKey) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ import ( | |
kerrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/types" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/controller" | ||
"sigs.k8s.io/controller-runtime/pkg/handler" | ||
|
@@ -147,6 +148,10 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile | |
if err != nil { | ||
if kerrors.IsNotFound(err) { | ||
logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.") | ||
// If there was a previous error or if a user removed this resource's finalizer, it's | ||
// possible Rook didn't clean up the monitoring routine for this resource. Ensure the | ||
// routine is stopped when we see the resource is gone. | ||
r.cancelMirrorMonitoring(cephBlockPool) | ||
return reconcile.Result{}, nil | ||
} | ||
// Error reading the object - requeue the request. | ||
|
@@ -174,6 +179,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile | |
// If not, we should wait for it to be ready | ||
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists | ||
if !cephBlockPool.GetDeletionTimestamp().IsZero() && !cephClusterExists { | ||
// don't leak the health checker routine if we are force-deleting | ||
r.cancelMirrorMonitoring(cephBlockPool) | ||
|
||
// Remove finalizer | ||
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephBlockPool) | ||
if err != nil { | ||
|
@@ -196,7 +204,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile | |
|
||
// Initialize the channel for this pool | ||
// This allows us to track multiple CephBlockPool in the same namespace | ||
blockPoolChannelKey := fmt.Sprintf("%s-%s", cephBlockPool.Namespace, cephBlockPool.Name) | ||
blockPoolChannelKey := blockPoolChannelKeyName(cephBlockPool) | ||
_, blockPoolContextsExists := r.blockPoolContexts[blockPoolChannelKey] | ||
if !blockPoolContextsExists { | ||
internalCtx, internalCancel := context.WithCancel(r.opManagerContext) | ||
|
@@ -210,9 +218,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile | |
if !cephBlockPool.GetDeletionTimestamp().IsZero() { | ||
// If the ceph block pool is still in the map, we must remove it during CR deletion | ||
// We must remove it first otherwise the checker will panic since the status/info will be nil | ||
if blockPoolContextsExists { | ||
r.cancelMirrorMonitoring(blockPoolChannelKey) | ||
} | ||
r.cancelMirrorMonitoring(cephBlockPool) | ||
|
||
logger.Infof("deleting pool %q", cephBlockPool.Name) | ||
err := deletePool(r.context, clusterInfo, cephBlockPool) | ||
|
@@ -314,7 +320,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile | |
|
||
// Stop monitoring the mirroring status of this pool | ||
if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started { | ||
r.cancelMirrorMonitoring(blockPoolChannelKey) | ||
r.cancelMirrorMonitoring(cephBlockPool) | ||
// Reset the MirrorHealthCheckSpec | ||
checker.updateStatusMirroring(nil, nil, nil, "") | ||
} | ||
|
@@ -403,10 +409,20 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient | |
return nil | ||
} | ||
|
||
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPoolName string) { | ||
// Cancel the context to stop the go routine | ||
r.blockPoolContexts[cephBlockPoolName].internalCancel() | ||
Comment on lines
-406
to
-408
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although the diff looks like I deleted this :, I extended this |
||
func blockPoolChannelKeyName(p *cephv1.CephBlockPool) string { | ||
return types.NamespacedName{Namespace: p.Namespace, Name: p.Name}.String() | ||
} | ||
|
||
// cancel mirror monitoring. This is a noop if monitoring is not running. | ||
func (r *ReconcileCephBlockPool) cancelMirrorMonitoring(cephBlockPool *cephv1.CephBlockPool) { | ||
channelKey := blockPoolChannelKeyName(cephBlockPool) | ||
|
||
_, poolContextExists := r.blockPoolContexts[channelKey] | ||
if poolContextExists { | ||
// Cancel the context to stop the go routine | ||
r.blockPoolContexts[channelKey].internalCancel() | ||
|
||
// Remove ceph block pool from the map | ||
delete(r.blockPoolContexts, cephBlockPoolName) | ||
// Remove ceph block pool from the map | ||
delete(r.blockPoolContexts, channelKey) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this was ever actually necessary to stop the health checker to look for dependents. At worst, there could be an error checking that causes the reconcile to re-run, but I don't think that is possible unless we have multiple simultaneous reconciles in the future. Therefore, I remove the if/if check and un-indent the stuff below.