Skip to content

Commit

Permalink
core: Add context parameter to AdminClusterInfo
Browse files Browse the repository at this point in the history
This commit removes the usage of context.TODO while creating ClusterInfo
to access the cluster as an admin. This commit also adds the context
parameter to the functions calling AdminClusterInfo to pass down the
context. This will allow us to handle cancellation of all the methods
being using the context from ClusterInfo.

Closes: rook#8701
Signed-off-by: Divyansh Kamboj <dkamboj@redhat.com>
  • Loading branch information
weirdwiz committed Jan 31, 2022
1 parent eb8d42b commit b4d9cce
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 17 deletions.
4 changes: 3 additions & 1 deletion cmd/rook/ceph/cleanup.go
Expand Up @@ -59,6 +59,8 @@ func startCleanUp(cmd *cobra.Command, args []string) error {
rook.SetLogLevel()
rook.LogStartupInfo(cleanUpCmd.Flags())

ctx := cmd.Context()

logger.Info("starting cluster clean up")
// Delete dataDirHostPath
if dataDirHostPath != "" {
Expand All @@ -67,7 +69,7 @@ func startCleanUp(cmd *cobra.Command, args []string) error {
}

namespace := os.Getenv(k8sutil.PodNamespaceEnvVar)
clusterInfo := client.AdminClusterInfo(namespace, "")
clusterInfo := client.AdminClusterInfo(ctx, namespace, "")
clusterInfo.FSID = clusterFSID

// Build Sanitizer
Expand Down
6 changes: 3 additions & 3 deletions pkg/daemon/ceph/client/info.go
Expand Up @@ -88,7 +88,7 @@ func (c *ClusterInfo) NamespacedName() types.NamespacedName {

// AdminClusterInfo() creates a ClusterInfo with the basic info to access the cluster
// as an admin.
func AdminClusterInfo(namespace, name string) *ClusterInfo {
func AdminClusterInfo(ctx context.Context, namespace, name string) *ClusterInfo {
ownerInfo := k8sutil.NewOwnerInfoWithOwnerRef(&metav1.OwnerReference{}, "")
return &ClusterInfo{
Namespace: namespace,
Expand All @@ -97,14 +97,14 @@ func AdminClusterInfo(namespace, name string) *ClusterInfo {
},
name: name,
OwnerInfo: ownerInfo,
Context: context.TODO(),
Context: ctx,
}
}

// AdminTestClusterInfo() creates a ClusterInfo with the basic info to access the cluster
// as an admin. This cluster info should only be used by unit or integration tests.
func AdminTestClusterInfo(namespace string) *ClusterInfo {
return AdminClusterInfo(namespace, "testing")
return AdminClusterInfo(context.TODO(), namespace, "testing")
}

// IsInitialized returns true if the critical information in the ClusterInfo struct has been filled
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/cluster.go
Expand Up @@ -66,12 +66,12 @@ type clusterHealth struct {
internalCancel context.CancelFunc
}

func newCluster(c *cephv1.CephCluster, context *clusterd.Context, ownerInfo *k8sutil.OwnerInfo) *cluster {
func newCluster(ctx context.Context, c *cephv1.CephCluster, context *clusterd.Context, ownerInfo *k8sutil.OwnerInfo) *cluster {
return &cluster{
// at this phase of the cluster creation process, the identity components of the cluster are
// not yet established. we reserve this struct which is filled in as soon as the cluster's
// identity can be established.
ClusterInfo: client.AdminClusterInfo(c.Namespace, c.Name),
ClusterInfo: client.AdminClusterInfo(ctx, c.Namespace, c.Name),
Namespace: c.Namespace,
Spec: &c.Spec,
context: context,
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/controller.go
Expand Up @@ -177,7 +177,7 @@ func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reco
},
},
handler.EnqueueRequestsFromMapFunc(handlerFunc),
predicateForNodeWatcher(mgr.GetClient(), context))
predicateForNodeWatcher(opManagerContext, mgr.GetClient(), context))
if err != nil {
return err
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func (c *ClusterController) reconcileCephCluster(clusterObj *cephv1.CephCluster,
cluster, ok := c.clusterMap[clusterObj.Namespace]
if !ok {
// It's a new cluster so let's populate the struct
cluster = newCluster(clusterObj, c.context, ownerInfo)
cluster = newCluster(c.OpManagerCtx, clusterObj, c.context, ownerInfo)
}
cluster.namespacedName = c.namespacedName

Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/cluster/predicate.go
Expand Up @@ -35,16 +35,16 @@ import (
)

// predicateForNodeWatcher is the predicate function to trigger reconcile on Node events
func predicateForNodeWatcher(client client.Client, context *clusterd.Context) predicate.Funcs {
func predicateForNodeWatcher(ctx context.Context, client client.Client, context *clusterd.Context) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
clientCluster := newClientCluster(client, e.Object.GetNamespace(), context)
return clientCluster.onK8sNode(e.Object)
return clientCluster.onK8sNode(ctx, e.Object)
},

UpdateFunc: func(e event.UpdateEvent) bool {
clientCluster := newClientCluster(client, e.ObjectNew.GetNamespace(), context)
return clientCluster.onK8sNode(e.ObjectNew)
return clientCluster.onK8sNode(ctx, e.ObjectNew)
},

DeleteFunc: func(e event.DeleteEvent) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/watcher.go
Expand Up @@ -60,7 +60,7 @@ func checkStorageForNode(cluster *cephv1.CephCluster) bool {
}

// onK8sNodeAdd is triggered when a node is added in the Kubernetes cluster
func (c *clientCluster) onK8sNode(object runtime.Object) bool {
func (c *clientCluster) onK8sNode(ctx context.Context, object runtime.Object) bool {
node, ok := object.(*v1.Node)
if !ok {
return false
Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *clientCluster) onK8sNode(object runtime.Object) bool {
// Is the node in the CRUSH map already?
// If so we don't need to reconcile, this is done to avoid double reconcile on operator restart
// Assume the admin key since we are watching for node status to create OSDs
clusterInfo := cephclient.AdminClusterInfo(cluster.Namespace, cluster.Name)
clusterInfo := cephclient.AdminClusterInfo(ctx, cluster.Namespace, cluster.Name)
osds, err := cephclient.GetOSDOnHost(c.context, clusterInfo, nodeName)
if err != nil {
if strings.Contains(err.Error(), opcontroller.UninitializedCephConfigError) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/operator/ceph/cluster/watcher_test.go
Expand Up @@ -87,6 +87,7 @@ func TestCheckStorageForNode(t *testing.T) {

func TestOnK8sNode(t *testing.T) {
ns := "rook-ceph"
ctx := context.TODO()
cephCluster := fakeCluster(ns)
objects := []runtime.Object{
cephCluster,
Expand Down Expand Up @@ -136,11 +137,11 @@ func TestOnK8sNode(t *testing.T) {
cephCluster.Status.Phase = k8sutil.ReadyStatus
client = getFakeClient(objects...)
clientCluster.client = client
b := clientCluster.onK8sNode(node)
b := clientCluster.onK8sNode(ctx, node)
assert.True(t, b)

// node will not reconcile
b = clientCluster.onK8sNode(node)
b = clientCluster.onK8sNode(ctx, node)
assert.False(t, b)
}

Expand Down
Expand Up @@ -134,7 +134,7 @@ func (r *MachineDisruptionReconciler) reconcile(request reconcile.Request) (reco
mdb.Spec.MaxUnavailable = &maxUnavailable
}
// Check if the cluster is clean or not
clusterInfo := cephClient.AdminClusterInfo(request.NamespacedName.Namespace, request.NamespacedName.Name)
clusterInfo := cephClient.AdminClusterInfo(r.context.OpManagerContext, request.NamespacedName.Namespace, request.NamespacedName.Name)
_, isClean, err := cephClient.IsClusterClean(r.context.ClusterdContext, clusterInfo)
if err != nil {
maxUnavailable := int32(0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/topic/controller_test.go
Expand Up @@ -86,7 +86,7 @@ func TestCephBucketTopicController(t *testing.T) {
},
},
}
clusterInfo := cephclient.AdminClusterInfo(namespace, "rook")
clusterInfo := cephclient.AdminClusterInfo(ctx, namespace, "rook")
clusterSpec := cephv1.ClusterSpec{}
req := reconcile.Request{
NamespacedName: types.NamespacedName{
Expand Down

0 comments on commit b4d9cce

Please sign in to comment.