Skip to content

Commit

Permalink
ceph: add context parameter to opcontroller
Browse files Browse the repository at this point in the history
This commit adds context parameter to utilities in opcontroller to
remove context.TODO use in opcontroller. By this, we can handle
cancellation of reconcilers in a fine-grained way. Part of rook#8701.

Signed-off-by: Yuichiro Ueno <y1r.ueno@gmail.com>
  • Loading branch information
y1r committed Oct 23, 2021
1 parent 1a6827b commit a38c1f1
Show file tree
Hide file tree
Showing 26 changed files with 81 additions and 79 deletions.
8 changes: 4 additions & 4 deletions pkg/operator/ceph/client/controller.go
Expand Up @@ -141,7 +141,7 @@ func (r *ReconcileCephClient) reconcile(request reconcile.Request) (reconcile.Re
}

// Set a finalizer so we can do cleanup before the object goes away
err = opcontroller.AddFinalizerIfNotPresent(r.client, cephClient)
err = opcontroller.AddFinalizerIfNotPresent(r.opManagerContext, r.client, cephClient)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to add finalizer")
}
Expand All @@ -152,7 +152,7 @@ func (r *ReconcileCephClient) reconcile(request reconcile.Request) (reconcile.Re
}

// Make sure a CephCluster is present otherwise do nothing
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
_, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
// We skip the deletePool() function since everything is gone already
Expand All @@ -162,7 +162,7 @@ func (r *ReconcileCephClient) reconcile(request reconcile.Request) (reconcile.Re
// This handles the case where the operator is not ready to accept Ceph command but the cluster exists
if !cephClient.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephClient)
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephClient)
if err != nil {
return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed to remove finalizer")
}
Expand All @@ -189,7 +189,7 @@ func (r *ReconcileCephClient) reconcile(request reconcile.Request) (reconcile.Re
}

// Remove finalizer
err = opcontroller.RemoveFinalizer(r.client, cephClient)
err = opcontroller.RemoveFinalizer(r.opManagerContext, r.client, cephClient)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to remove finalizer")
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/operator/ceph/cluster/cluster.go
Expand Up @@ -92,7 +92,7 @@ func (c *cluster) reconcileCephDaemons(rookImage string, cephVersion cephver.Cep
}

// Start the mon pods
controller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Configuring Ceph Mons")
controller.UpdateCondition(c.ClusterInfo.Context, c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Configuring Ceph Mons")
clusterInfo, err := c.mons.Start(c.ClusterInfo, rookImage, cephVersion, *c.Spec)
if err != nil {
return errors.Wrap(err, "failed to start ceph monitors")
Expand Down Expand Up @@ -120,15 +120,15 @@ func (c *cluster) reconcileCephDaemons(rookImage string, cephVersion cephver.Cep
}

// Start Ceph manager
controller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Configuring Ceph Mgr(s)")
controller.UpdateCondition(c.ClusterInfo.Context, c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Configuring Ceph Mgr(s)")
mgrs := mgr.New(c.context, c.ClusterInfo, *c.Spec, rookImage)
err = mgrs.Start()
if err != nil {
return errors.Wrap(err, "failed to start ceph mgr")
}

// Start the OSDs
controller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Configuring Ceph OSDs")
controller.UpdateCondition(c.ClusterInfo.Context, c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Configuring Ceph OSDs")
osds := osd.New(c.context, c.ClusterInfo, *c.Spec, rookImage)
err = osds.Start()
if err != nil {
Expand Down Expand Up @@ -179,7 +179,7 @@ func (c *ClusterController) initializeCluster(cluster *cluster) error {
if cluster.Spec.External.Enable {
err := c.configureExternalCephCluster(cluster)
if err != nil {
controller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionFalse, cephv1.ClusterProgressingReason, err.Error())
controller.UpdateCondition(c.OpManagerCtx, c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionFalse, cephv1.ClusterProgressingReason, err.Error())
return errors.Wrap(err, "failed to configure external ceph cluster")
}
} else {
Expand All @@ -194,7 +194,7 @@ func (c *ClusterController) initializeCluster(cluster *cluster) error {

err = c.configureLocalCephCluster(cluster)
if err != nil {
controller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionFalse, cephv1.ClusterProgressingReason, err.Error())
controller.UpdateCondition(c.OpManagerCtx, c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionFalse, cephv1.ClusterProgressingReason, err.Error())
return errors.Wrap(err, "failed to configure local ceph cluster")
}
}
Expand All @@ -216,7 +216,7 @@ func (c *ClusterController) configureLocalCephCluster(cluster *cluster) error {
}

// Run image validation job
controller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Detecting Ceph version")
controller.UpdateCondition(c.OpManagerCtx, c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Detecting Ceph version")
cephVersion, isUpgrade, err := c.detectAndValidateCephVersion(cluster)
if err != nil {
return errors.Wrap(err, "failed the ceph version check")
Expand All @@ -231,7 +231,7 @@ func (c *ClusterController) configureLocalCephCluster(cluster *cluster) error {
}
}

controller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Configuring the Ceph cluster")
controller.UpdateCondition(c.OpManagerCtx, c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, "Configuring the Ceph cluster")

cluster.ClusterInfo.Context = c.OpManagerCtx
// Run the orchestration
Expand All @@ -241,7 +241,7 @@ func (c *ClusterController) configureLocalCephCluster(cluster *cluster) error {
}

// Set the condition to the cluster object
controller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionReady, v1.ConditionTrue, cephv1.ClusterCreatedReason, "Cluster created successfully")
controller.UpdateCondition(c.OpManagerCtx, c.context, c.namespacedName, cephv1.ConditionReady, v1.ConditionTrue, cephv1.ClusterCreatedReason, "Cluster created successfully")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/cluster_external.go
Expand Up @@ -44,7 +44,7 @@ func (c *ClusterController) configureExternalCephCluster(cluster *cluster) error
return errors.Wrap(err, "failed to validate external cluster specs")
}

opcontroller.UpdateCondition(c.context, c.namespacedName, cephv1.ConditionConnecting, v1.ConditionTrue, cephv1.ClusterConnectingReason, "Attempting to connect to an external Ceph cluster")
opcontroller.UpdateCondition(c.OpManagerCtx, c.context, c.namespacedName, cephv1.ConditionConnecting, v1.ConditionTrue, cephv1.ClusterConnectingReason, "Attempting to connect to an external Ceph cluster")

// loop until we find the secret necessary to connect to the external cluster
// then populate clusterInfo
Expand Down
12 changes: 6 additions & 6 deletions pkg/operator/ceph/cluster/controller.go
Expand Up @@ -104,7 +104,7 @@ type ReconcileCephCluster struct {
// Add creates a new CephCluster Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, ctx *clusterd.Context, clusterController *ClusterController, opManagerContext context.Context) error {
return add(mgr, newReconciler(mgr, ctx, clusterController, opManagerContext), ctx)
return add(opManagerContext, mgr, newReconciler(mgr, ctx, clusterController, opManagerContext), ctx)
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -123,7 +123,7 @@ func newReconciler(mgr manager.Manager, ctx *clusterd.Context, clusterController
}
}

func add(mgr manager.Manager, r reconcile.Reconciler, context *clusterd.Context) error {
func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reconciler, context *clusterd.Context) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
Expand Down Expand Up @@ -162,7 +162,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler, context *clusterd.Context)

// Build Handler function to return the list of ceph clusters
// This is used by the watchers below
handlerFunc, err := opcontroller.ObjectToCRMapper(mgr.GetClient(), &cephv1.CephClusterList{}, mgr.GetScheme())
handlerFunc, err := opcontroller.ObjectToCRMapper(opManagerContext, mgr.GetClient(), &cephv1.CephClusterList{}, mgr.GetScheme())
if err != nil {
return err
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (r *ReconcileCephCluster) reconcile(request reconcile.Request) (reconcile.R
}

// Set a finalizer so we can do cleanup before the object goes away
err = opcontroller.AddFinalizerIfNotPresent(r.client, cephCluster)
err = opcontroller.AddFinalizerIfNotPresent(r.opManagerContext, r.client, cephCluster)
if err != nil {
return reconcile.Result{}, cephCluster, errors.Wrap(err, "failed to add finalizer")
}
Expand Down Expand Up @@ -496,12 +496,12 @@ func (r *ReconcileCephCluster) removeFinalizer(client client.Client, name types.
}

if finalizer == "" {
err = opcontroller.RemoveFinalizer(client, obj)
err = opcontroller.RemoveFinalizer(r.opManagerContext, client, obj)
if err != nil {
return errors.Wrap(err, "failed to remove finalizer")
}
} else {
err = opcontroller.RemoveFinalizerWithName(client, obj, finalizer)
err = opcontroller.RemoveFinalizerWithName(r.opManagerContext, client, obj, finalizer)
if err != nil {
return errors.Wrapf(err, "failed to remove finalizer %q", finalizer)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/osd/create.go
Expand Up @@ -386,7 +386,7 @@ func createDaemonOnPVC(c *Cluster, osd OSDInfo, pvcName string, config *provisio
}

message := fmt.Sprintf("Processing OSD %d on PVC %q", osd.ID, pvcName)
updateConditionFunc(c.context, c.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)
updateConditionFunc(c.clusterInfo.Context, c.context, c.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)

_, err = k8sutil.CreateDeployment(c.context.Clientset, d)
return errors.Wrapf(err, "failed to create deployment for OSD %d on PVC %q", osd.ID, pvcName)
Expand All @@ -399,7 +399,7 @@ func createDaemonOnNode(c *Cluster, osd OSDInfo, nodeName string, config *provis
}

message := fmt.Sprintf("Processing OSD %d on node %q", osd.ID, nodeName)
updateConditionFunc(c.context, c.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)
updateConditionFunc(c.clusterInfo.Context, c.context, c.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)

_, err = k8sutil.CreateDeployment(c.context.Clientset, d)
return errors.Wrapf(err, "failed to create deployment for OSD %d on node %q", osd.ID, nodeName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/integration_test.go
Expand Up @@ -110,7 +110,7 @@ func testOSDIntegration(t *testing.T) {
}()
// stub out the conditionExportFunc to do nothing. we do not have a fake Rook interface that
// allows us to interact with a CephCluster resource like the fake K8s clientset.
updateConditionFunc = func(c *clusterd.Context, namespaceName types.NamespacedName, conditionType cephv1.ConditionType, status corev1.ConditionStatus, reason cephv1.ConditionReason, message string) {
updateConditionFunc = func(ctx context.Context, c *clusterd.Context, namespaceName types.NamespacedName, conditionType cephv1.ConditionType, status corev1.ConditionStatus, reason cephv1.ConditionReason, message string) {
// do nothing
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/osd_test.go
Expand Up @@ -132,7 +132,7 @@ func TestAddRemoveNode(t *testing.T) {
}()
// stub out the conditionExportFunc to do nothing. we do not have a fake Rook interface that
// allows us to interact with a CephCluster resource like the fake K8s clientset.
updateConditionFunc = func(c *clusterd.Context, namespaceName types.NamespacedName, conditionType cephv1.ConditionType, status corev1.ConditionStatus, reason cephv1.ConditionReason, message string) {
updateConditionFunc = func(ctx context.Context, c *clusterd.Context, namespaceName types.NamespacedName, conditionType cephv1.ConditionType, status corev1.ConditionStatus, reason cephv1.ConditionReason, message string) {
// do nothing
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/osd/update.go
Expand Up @@ -147,7 +147,7 @@ func (c *updateConfig) updateExistingOSDs(errs *provisionErrors) {
updatedDep, err = deploymentOnPVCFunc(c.cluster, osdInfo, nodeOrPVCName, c.provisionConfig)

message := fmt.Sprintf("Processing OSD %d on PVC %q", osdID, nodeOrPVCName)
updateConditionFunc(c.cluster.context, c.cluster.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)
updateConditionFunc(c.cluster.clusterInfo.Context, c.cluster.context, c.cluster.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)
} else {
if !c.cluster.ValidStorage.NodeExists(nodeOrPVCName) {
// node will not reconcile, so don't update the deployment
Expand All @@ -164,7 +164,7 @@ func (c *updateConfig) updateExistingOSDs(errs *provisionErrors) {
updatedDep, err = deploymentOnNodeFunc(c.cluster, osdInfo, nodeOrPVCName, c.provisionConfig)

message := fmt.Sprintf("Processing OSD %d on node %q", osdID, nodeOrPVCName)
updateConditionFunc(c.cluster.context, c.cluster.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)
updateConditionFunc(c.cluster.clusterInfo.Context, c.cluster.context, c.cluster.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)
}
if err != nil {
errs.addError("%v", errors.Wrapf(err, "failed to update OSD %d", osdID))
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/update_test.go
Expand Up @@ -121,7 +121,7 @@ func Test_updateExistingOSDs(t *testing.T) {

// stub out the conditionExportFunc to do nothing. we do not have a fake Rook interface that
// allows us to interact with a CephCluster resource like the fake K8s clientset.
updateConditionFunc = func(c *clusterd.Context, namespaceName types.NamespacedName, conditionType cephv1.ConditionType, status corev1.ConditionStatus, reason cephv1.ConditionReason, message string) {
updateConditionFunc = func(ctx context.Context, c *clusterd.Context, namespaceName types.NamespacedName, conditionType cephv1.ConditionType, status corev1.ConditionStatus, reason cephv1.ConditionReason, message string) {
// do nothing
}
shouldCheckOkToStopFunc = func(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/controller.go
Expand Up @@ -172,7 +172,7 @@ func (r *ReconcileCephRBDMirror) reconcile(request reconcile.Request) (reconcile
}

// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.client, r.context, request.NamespacedName, controllerName)
cephCluster, isReadyToReconcile, _, reconcileResponse := opcontroller.IsReadyToReconcile(r.opManagerContext, r.client, r.context, request.NamespacedName, controllerName)
if !isReadyToReconcile {
logger.Debugf("CephCluster resource not ready in namespace %q, retrying in %q.", request.NamespacedName.Namespace, reconcileResponse.RequeueAfter.String())
return reconcileResponse, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/controller/conditions.go
Expand Up @@ -30,10 +30,10 @@ import (
)

// UpdateCondition function will export each condition into the cluster custom resource
func UpdateCondition(c *clusterd.Context, namespaceName types.NamespacedName, conditionType cephv1.ConditionType, status v1.ConditionStatus, reason cephv1.ConditionReason, message string) {
func UpdateCondition(ctx context.Context, c *clusterd.Context, namespaceName types.NamespacedName, conditionType cephv1.ConditionType, status v1.ConditionStatus, reason cephv1.ConditionReason, message string) {
// use client.Client unit test this more easily with updating statuses which must use the client
cluster := &cephv1.CephCluster{}
if err := c.Client.Get(context.TODO(), namespaceName, cluster); err != nil {
if err := c.Client.Get(ctx, namespaceName, cluster); err != nil {
logger.Errorf("failed to get cluster %v to update the conditions. %v", namespaceName, err)
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/controller/controller_utils.go
Expand Up @@ -114,14 +114,14 @@ func canIgnoreHealthErrStatusInReconcile(cephCluster cephv1.CephCluster, control
}

// IsReadyToReconcile determines if a controller is ready to reconcile or not
func IsReadyToReconcile(c client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
func IsReadyToReconcile(ctx context.Context, c client.Client, clustercontext *clusterd.Context, namespacedName types.NamespacedName, controllerName string) (cephv1.CephCluster, bool, bool, reconcile.Result) {
cephClusterExists := false

// Running ceph commands won't work and the controller will keep re-queuing so I believe it's fine not to check
// Make sure a CephCluster exists before doing anything
var cephCluster cephv1.CephCluster
clusterList := &cephv1.CephClusterList{}
err := c.List(context.TODO(), clusterList, client.InNamespace(namespacedName.Namespace))
err := c.List(ctx, clusterList, client.InNamespace(namespacedName.Namespace))
if err != nil {
logger.Errorf("%q: failed to fetch CephCluster %v", controllerName, err)
return cephCluster, false, cephClusterExists, ImmediateRetryResult
Expand Down
14 changes: 7 additions & 7 deletions pkg/operator/ceph/controller/finalizer.go
Expand Up @@ -52,7 +52,7 @@ func remove(list []string, s string) []string {

// AddFinalizerIfNotPresent adds a finalizer an object to avoid instant deletion
// of the object without finalizing it.
func AddFinalizerIfNotPresent(client client.Client, obj client.Object) error {
func AddFinalizerIfNotPresent(ctx context.Context, client client.Client, obj client.Object) error {
objectFinalizer := buildFinalizerName(obj.GetObjectKind().GroupVersionKind().Kind)

accessor, err := meta.Accessor(obj)
Expand All @@ -65,7 +65,7 @@ func AddFinalizerIfNotPresent(client client.Client, obj client.Object) error {
accessor.SetFinalizers(append(accessor.GetFinalizers(), objectFinalizer))

// Update CR with finalizer
if err := client.Update(context.TODO(), obj); err != nil {
if err := client.Update(ctx, obj); err != nil {
return errors.Wrapf(err, "failed to add finalizer %q on %q", objectFinalizer, accessor.GetName())
}
}
Expand All @@ -74,14 +74,14 @@ func AddFinalizerIfNotPresent(client client.Client, obj client.Object) error {
}

// RemoveFinalizer removes a finalizer from an object
func RemoveFinalizer(client client.Client, obj client.Object) error {
func RemoveFinalizer(ctx context.Context, client client.Client, obj client.Object) error {
finalizerName := buildFinalizerName(obj.GetObjectKind().GroupVersionKind().Kind)
return RemoveFinalizerWithName(client, obj, finalizerName)
return RemoveFinalizerWithName(ctx, client, obj, finalizerName)
}

// RemoveFinalizerWithName removes finalizer passed as an argument from an object
func RemoveFinalizerWithName(client client.Client, obj client.Object, finalizerName string) error {
err := client.Get(context.TODO(), types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, obj)
func RemoveFinalizerWithName(ctx context.Context, client client.Client, obj client.Object, finalizerName string) error {
err := client.Get(ctx, types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, obj)
if err != nil {
return errors.Wrap(err, "failed to get the latest version of the object")
}
Expand All @@ -93,7 +93,7 @@ func RemoveFinalizerWithName(client client.Client, obj client.Object, finalizerN
if contains(accessor.GetFinalizers(), finalizerName) {
logger.Infof("removing finalizer %q on %q", finalizerName, accessor.GetName())
accessor.SetFinalizers(remove(accessor.GetFinalizers(), finalizerName))
if err := client.Update(context.TODO(), obj); err != nil {
if err := client.Update(ctx, obj); err != nil {
return errors.Wrapf(err, "failed to remove finalizer %q on %q", finalizerName, accessor.GetName())
}
}
Expand Down

0 comments on commit a38c1f1

Please sign in to comment.