diff --git a/cmd/rook/ceph/cleanup.go b/cmd/rook/ceph/cleanup.go index 27fb80c6e5a43..e13e92a3067ed 100644 --- a/cmd/rook/ceph/cleanup.go +++ b/cmd/rook/ceph/cleanup.go @@ -59,6 +59,8 @@ func startCleanUp(cmd *cobra.Command, args []string) error { rook.SetLogLevel() rook.LogStartupInfo(cleanUpCmd.Flags()) + ctx := cmd.Context() + logger.Info("starting cluster clean up") // Delete dataDirHostPath if dataDirHostPath != "" { @@ -67,7 +69,7 @@ func startCleanUp(cmd *cobra.Command, args []string) error { } namespace := os.Getenv(k8sutil.PodNamespaceEnvVar) - clusterInfo := client.AdminClusterInfo(namespace, "") + clusterInfo := client.AdminClusterInfo(ctx, namespace, "") clusterInfo.FSID = clusterFSID // Build Sanitizer diff --git a/pkg/daemon/ceph/client/info.go b/pkg/daemon/ceph/client/info.go index e0c28890d0868..047cd99bccf85 100644 --- a/pkg/daemon/ceph/client/info.go +++ b/pkg/daemon/ceph/client/info.go @@ -88,7 +88,7 @@ func (c *ClusterInfo) NamespacedName() types.NamespacedName { // AdminClusterInfo() creates a ClusterInfo with the basic info to access the cluster // as an admin. -func AdminClusterInfo(namespace, name string) *ClusterInfo { +func AdminClusterInfo(ctx context.Context, namespace, name string) *ClusterInfo { ownerInfo := k8sutil.NewOwnerInfoWithOwnerRef(&metav1.OwnerReference{}, "") return &ClusterInfo{ Namespace: namespace, @@ -97,14 +97,14 @@ func AdminClusterInfo(namespace, name string) *ClusterInfo { }, name: name, OwnerInfo: ownerInfo, - Context: context.TODO(), + Context: ctx, } } // AdminTestClusterInfo() creates a ClusterInfo with the basic info to access the cluster // as an admin. This cluster info should only be used by unit or integration tests. func AdminTestClusterInfo(namespace string) *ClusterInfo { - return AdminClusterInfo(namespace, "testing") + return AdminClusterInfo(context.TODO(), namespace, "testing") } // IsInitialized returns true if the critical information in the ClusterInfo struct has been filled diff --git a/pkg/operator/ceph/cluster/cluster.go b/pkg/operator/ceph/cluster/cluster.go index c9cf926f45662..6ecb9402fc8fc 100755 --- a/pkg/operator/ceph/cluster/cluster.go +++ b/pkg/operator/ceph/cluster/cluster.go @@ -66,12 +66,12 @@ type clusterHealth struct { internalCancel context.CancelFunc } -func newCluster(c *cephv1.CephCluster, context *clusterd.Context, ownerInfo *k8sutil.OwnerInfo) *cluster { +func newCluster(ctx context.Context, c *cephv1.CephCluster, context *clusterd.Context, ownerInfo *k8sutil.OwnerInfo) *cluster { return &cluster{ // at this phase of the cluster creation process, the identity components of the cluster are // not yet established. we reserve this struct which is filled in as soon as the cluster's // identity can be established. - ClusterInfo: client.AdminClusterInfo(c.Namespace, c.Name), + ClusterInfo: client.AdminClusterInfo(ctx, c.Namespace, c.Name), Namespace: c.Namespace, Spec: &c.Spec, context: context, diff --git a/pkg/operator/ceph/cluster/controller.go b/pkg/operator/ceph/cluster/controller.go index f1fa428745f80..0ffa9a82321d5 100644 --- a/pkg/operator/ceph/cluster/controller.go +++ b/pkg/operator/ceph/cluster/controller.go @@ -177,7 +177,7 @@ func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reco }, }, handler.EnqueueRequestsFromMapFunc(handlerFunc), - predicateForNodeWatcher(mgr.GetClient(), context)) + predicateForNodeWatcher(opManagerContext, mgr.GetClient(), context)) if err != nil { return err } @@ -340,7 +340,7 @@ func (c *ClusterController) reconcileCephCluster(clusterObj *cephv1.CephCluster, cluster, ok := c.clusterMap[clusterObj.Namespace] if !ok { // It's a new cluster so let's populate the struct - cluster = newCluster(clusterObj, c.context, ownerInfo) + cluster = newCluster(c.OpManagerCtx, clusterObj, c.context, ownerInfo) } cluster.namespacedName = c.namespacedName diff --git a/pkg/operator/ceph/cluster/predicate.go b/pkg/operator/ceph/cluster/predicate.go index a0113368ff906..001adb5482145 100644 --- a/pkg/operator/ceph/cluster/predicate.go +++ b/pkg/operator/ceph/cluster/predicate.go @@ -35,16 +35,16 @@ import ( ) // predicateForNodeWatcher is the predicate function to trigger reconcile on Node events -func predicateForNodeWatcher(client client.Client, context *clusterd.Context) predicate.Funcs { +func predicateForNodeWatcher(ctx context.Context, client client.Client, context *clusterd.Context) predicate.Funcs { return predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { clientCluster := newClientCluster(client, e.Object.GetNamespace(), context) - return clientCluster.onK8sNode(e.Object) + return clientCluster.onK8sNode(ctx, e.Object) }, UpdateFunc: func(e event.UpdateEvent) bool { clientCluster := newClientCluster(client, e.ObjectNew.GetNamespace(), context) - return clientCluster.onK8sNode(e.ObjectNew) + return clientCluster.onK8sNode(ctx, e.ObjectNew) }, DeleteFunc: func(e event.DeleteEvent) bool { diff --git a/pkg/operator/ceph/cluster/watcher.go b/pkg/operator/ceph/cluster/watcher.go index 2f430149b08d2..f4e822a893577 100644 --- a/pkg/operator/ceph/cluster/watcher.go +++ b/pkg/operator/ceph/cluster/watcher.go @@ -60,7 +60,7 @@ func checkStorageForNode(cluster *cephv1.CephCluster) bool { } // onK8sNodeAdd is triggered when a node is added in the Kubernetes cluster -func (c *clientCluster) onK8sNode(object runtime.Object) bool { +func (c *clientCluster) onK8sNode(ctx context.Context, object runtime.Object) bool { node, ok := object.(*v1.Node) if !ok { return false @@ -106,7 +106,7 @@ func (c *clientCluster) onK8sNode(object runtime.Object) bool { // Is the node in the CRUSH map already? // If so we don't need to reconcile, this is done to avoid double reconcile on operator restart // Assume the admin key since we are watching for node status to create OSDs - clusterInfo := cephclient.AdminClusterInfo(cluster.Namespace, cluster.Name) + clusterInfo := cephclient.AdminClusterInfo(ctx, cluster.Namespace, cluster.Name) osds, err := cephclient.GetOSDOnHost(c.context, clusterInfo, nodeName) if err != nil { if strings.Contains(err.Error(), opcontroller.UninitializedCephConfigError) { diff --git a/pkg/operator/ceph/cluster/watcher_test.go b/pkg/operator/ceph/cluster/watcher_test.go index 17f1834df6e96..7ef89d1df5b45 100644 --- a/pkg/operator/ceph/cluster/watcher_test.go +++ b/pkg/operator/ceph/cluster/watcher_test.go @@ -87,6 +87,7 @@ func TestCheckStorageForNode(t *testing.T) { func TestOnK8sNode(t *testing.T) { ns := "rook-ceph" + ctx := context.TODO() cephCluster := fakeCluster(ns) objects := []runtime.Object{ cephCluster, @@ -136,11 +137,11 @@ func TestOnK8sNode(t *testing.T) { cephCluster.Status.Phase = k8sutil.ReadyStatus client = getFakeClient(objects...) clientCluster.client = client - b := clientCluster.onK8sNode(node) + b := clientCluster.onK8sNode(ctx, node) assert.True(t, b) // node will not reconcile - b = clientCluster.onK8sNode(node) + b = clientCluster.onK8sNode(ctx, node) assert.False(t, b) } diff --git a/pkg/operator/ceph/disruption/machinedisruption/reconcile.go b/pkg/operator/ceph/disruption/machinedisruption/reconcile.go index cd22d320eaf4f..3c0cdff5e1e65 100644 --- a/pkg/operator/ceph/disruption/machinedisruption/reconcile.go +++ b/pkg/operator/ceph/disruption/machinedisruption/reconcile.go @@ -134,7 +134,7 @@ func (r *MachineDisruptionReconciler) reconcile(request reconcile.Request) (reco mdb.Spec.MaxUnavailable = &maxUnavailable } // Check if the cluster is clean or not - clusterInfo := cephClient.AdminClusterInfo(request.NamespacedName.Namespace, request.NamespacedName.Name) + clusterInfo := cephClient.AdminClusterInfo(r.context.OpManagerContext, request.NamespacedName.Namespace, request.NamespacedName.Name) _, isClean, err := cephClient.IsClusterClean(r.context.ClusterdContext, clusterInfo) if err != nil { maxUnavailable := int32(0) diff --git a/pkg/operator/ceph/object/topic/controller_test.go b/pkg/operator/ceph/object/topic/controller_test.go index c716b30fa37bc..67cbdfa27b7f7 100644 --- a/pkg/operator/ceph/object/topic/controller_test.go +++ b/pkg/operator/ceph/object/topic/controller_test.go @@ -86,7 +86,7 @@ func TestCephBucketTopicController(t *testing.T) { }, }, } - clusterInfo := cephclient.AdminClusterInfo(namespace, "rook") + clusterInfo := cephclient.AdminClusterInfo(ctx, namespace, "rook") clusterSpec := cephv1.ClusterSpec{} req := reconcile.Request{ NamespacedName: types.NamespacedName{