From 0b575703c7313f21d4fd53e3469af9ebdcea0439 Mon Sep 17 00:00:00 2001 From: Yuichiro Ueno Date: Sat, 13 Nov 2021 14:54:44 +0900 Subject: [PATCH] core: add context parameter to k8sutil deployment This commit adds context parameter to k8sutil deployment functions. By this, we can handle cancellation during API call of deployment resource. Signed-off-by: Yuichiro Ueno --- pkg/daemon/ceph/osd/remove.go | 2 +- pkg/operator/ceph/cluster/mgr/mgr.go | 2 +- pkg/operator/ceph/cluster/mgr/mgr_test.go | 2 +- pkg/operator/ceph/cluster/mon/mon.go | 4 +- pkg/operator/ceph/cluster/mon/spec.go | 2 +- pkg/operator/ceph/cluster/osd/create.go | 4 +- pkg/operator/ceph/cluster/osd/health.go | 4 +- pkg/operator/ceph/cluster/osd/update.go | 2 +- pkg/operator/ceph/cluster/osd/update_test.go | 1 + pkg/operator/ceph/csi/controller.go | 4 +- pkg/operator/ceph/csi/peermap/config.go | 15 ++++--- pkg/operator/ceph/csi/peermap/config_test.go | 6 +-- pkg/operator/ceph/csi/spec.go | 6 +-- pkg/operator/ceph/file/mds/mds.go | 6 +-- pkg/operator/ceph/file/mds/spec.go | 10 ++--- pkg/operator/ceph/object/rgw.go | 6 +-- pkg/operator/ceph/pool/controller.go | 2 +- pkg/operator/k8sutil/deployment.go | 42 ++++++++------------ pkg/operator/k8sutil/deployment_test.go | 10 ++--- tests/integration/ceph_upgrade_test.go | 13 +++--- 20 files changed, 67 insertions(+), 76 deletions(-) diff --git a/pkg/daemon/ceph/osd/remove.go b/pkg/daemon/ceph/osd/remove.go index a3ebc6aeed2c..821a35f713c8 100644 --- a/pkg/daemon/ceph/osd/remove.go +++ b/pkg/daemon/ceph/osd/remove.go @@ -87,7 +87,7 @@ func removeOSD(clusterdContext *clusterd.Context, clusterInfo *client.ClusterInf logger.Errorf("failed to fetch the deployment %q. %v", deploymentName, err) } else { logger.Infof("removing the OSD deployment %q", deploymentName) - if err := k8sutil.DeleteDeployment(clusterdContext.Clientset, clusterInfo.Namespace, deploymentName); err != nil { + if err := k8sutil.DeleteDeployment(clusterInfo.Context, clusterdContext.Clientset, clusterInfo.Namespace, deploymentName); err != nil { if err != nil { // Continue purging the OSD even if the deployment fails to be deleted logger.Errorf("failed to delete deployment for OSD %d. %v", osdID, err) diff --git a/pkg/operator/ceph/cluster/mgr/mgr.go b/pkg/operator/ceph/cluster/mgr/mgr.go index 518f87109736..d211bc891585 100644 --- a/pkg/operator/ceph/cluster/mgr/mgr.go +++ b/pkg/operator/ceph/cluster/mgr/mgr.go @@ -170,7 +170,7 @@ func (c *Cluster) Start() error { // If the mgr is newly created, wait for it to start before continuing with the service and // module configuration for _, d := range deploymentsToWaitFor { - if err := waitForDeploymentToStart(c.context, d); err != nil { + if err := waitForDeploymentToStart(c.clusterInfo.Context, c.context, d); err != nil { return errors.Wrapf(err, "failed to wait for mgr %q to start", d.Name) } } diff --git a/pkg/operator/ceph/cluster/mgr/mgr_test.go b/pkg/operator/ceph/cluster/mgr/mgr_test.go index d4ad1ddc372f..28fe2f152c14 100644 --- a/pkg/operator/ceph/cluster/mgr/mgr_test.go +++ b/pkg/operator/ceph/cluster/mgr/mgr_test.go @@ -57,7 +57,7 @@ func TestStartMgr(t *testing.T) { return "{\"key\":\"mysecurekey\"}", nil }, } - waitForDeploymentToStart = func(clusterdContext *clusterd.Context, deployment *apps.Deployment) error { + waitForDeploymentToStart = func(ctx context.Context, clusterdContext *clusterd.Context, deployment *apps.Deployment) error { logger.Infof("simulated mgr deployment starting") return nil } diff --git a/pkg/operator/ceph/cluster/mon/mon.go b/pkg/operator/ceph/cluster/mon/mon.go index a5e793404744..aa722febb905 100644 --- a/pkg/operator/ceph/cluster/mon/mon.go +++ b/pkg/operator/ceph/cluster/mon/mon.go @@ -673,7 +673,7 @@ func scheduleMonitor(c *Cluster, mon *monConfig) (*apps.Deployment, error) { logger.Infof("created canary deployment %s", d.Name) break } else if kerrors.IsAlreadyExists(err) { - if err := k8sutil.DeleteDeployment(c.context.Clientset, c.Namespace, d.Name); err != nil { + if err := k8sutil.DeleteDeployment(c.ClusterInfo.Context, c.context.Clientset, c.Namespace, d.Name); err != nil { return nil, errors.Wrapf(err, "failed to delete canary deployment %s", d.Name) } logger.Infof("deleted existing canary deployment %s", d.Name) @@ -782,7 +782,7 @@ func (c *Cluster) initMonIPs(mons []*monConfig) error { // Delete mon canary deployments (and associated PVCs) using deployment labels // to select this kind of temporary deployments func (c *Cluster) removeCanaryDeployments() { - canaryDeployments, err := k8sutil.GetDeployments(c.context.Clientset, c.Namespace, "app=rook-ceph-mon,mon_canary=true") + canaryDeployments, err := k8sutil.GetDeployments(c.ClusterInfo.Context, c.context.Clientset, c.Namespace, "app=rook-ceph-mon,mon_canary=true") if err != nil { logger.Warningf("failed to get the list of monitor canary deployments. %v", err) return diff --git a/pkg/operator/ceph/cluster/mon/spec.go b/pkg/operator/ceph/cluster/mon/spec.go index 45541e2da26e..6316560207d7 100644 --- a/pkg/operator/ceph/cluster/mon/spec.go +++ b/pkg/operator/ceph/cluster/mon/spec.go @@ -380,6 +380,6 @@ func UpdateCephDeploymentAndWait(context *clusterd.Context, clusterInfo *client. return nil } - err := k8sutil.UpdateDeploymentAndWait(context, deployment, clusterInfo.Namespace, callback) + err := k8sutil.UpdateDeploymentAndWait(clusterInfo.Context, context, deployment, clusterInfo.Namespace, callback) return err } diff --git a/pkg/operator/ceph/cluster/osd/create.go b/pkg/operator/ceph/cluster/osd/create.go index 3684f7105c5e..173cecab0699 100644 --- a/pkg/operator/ceph/cluster/osd/create.go +++ b/pkg/operator/ceph/cluster/osd/create.go @@ -383,7 +383,7 @@ func createDaemonOnPVC(c *Cluster, osd OSDInfo, pvcName string, config *provisio message := fmt.Sprintf("Processing OSD %d on PVC %q", osd.ID, pvcName) updateConditionFunc(c.clusterInfo.Context, c.context, c.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message) - _, err = k8sutil.CreateDeployment(c.context.Clientset, d) + _, err = k8sutil.CreateDeployment(c.clusterInfo.Context, c.context.Clientset, d) return errors.Wrapf(err, "failed to create deployment for OSD %d on PVC %q", osd.ID, pvcName) } @@ -396,6 +396,6 @@ func createDaemonOnNode(c *Cluster, osd OSDInfo, nodeName string, config *provis message := fmt.Sprintf("Processing OSD %d on node %q", osd.ID, nodeName) updateConditionFunc(c.clusterInfo.Context, c.context, c.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message) - _, err = k8sutil.CreateDeployment(c.context.Clientset, d) + _, err = k8sutil.CreateDeployment(c.clusterInfo.Context, c.context.Clientset, d) return errors.Wrapf(err, "failed to create deployment for OSD %d on node %q", osd.ID, nodeName) } diff --git a/pkg/operator/ceph/cluster/osd/health.go b/pkg/operator/ceph/cluster/osd/health.go index 6bff421852fe..b8319bd0f463 100644 --- a/pkg/operator/ceph/cluster/osd/health.go +++ b/pkg/operator/ceph/cluster/osd/health.go @@ -156,7 +156,7 @@ func (m *OSDHealthMonitor) checkOSDDump() error { func (m *OSDHealthMonitor) removeOSDDeploymentIfSafeToDestroy(outOSDid int) error { label := fmt.Sprintf("ceph-osd-id=%d", outOSDid) - dp, err := k8sutil.GetDeployments(m.context.Clientset, m.clusterInfo.Namespace, label) + dp, err := k8sutil.GetDeployments(m.clusterInfo.Context, m.context.Clientset, m.clusterInfo.Namespace, label) if err != nil { if kerrors.IsNotFound(err) { return nil @@ -175,7 +175,7 @@ func (m *OSDHealthMonitor) removeOSDDeploymentIfSafeToDestroy(outOSDid int) erro currentTime := time.Now().UTC() if podDeletionTimeStamp.Before(currentTime) { logger.Infof("osd.%d is 'safe-to-destroy'. removing the osd deployment.", outOSDid) - if err := k8sutil.DeleteDeployment(m.context.Clientset, dp.Items[0].Namespace, dp.Items[0].Name); err != nil { + if err := k8sutil.DeleteDeployment(m.clusterInfo.Context, m.context.Clientset, dp.Items[0].Namespace, dp.Items[0].Name); err != nil { return errors.Wrapf(err, "failed to delete osd deployment %s", dp.Items[0].Name) } } diff --git a/pkg/operator/ceph/cluster/osd/update.go b/pkg/operator/ceph/cluster/osd/update.go index 2658c50ef78f..0790aed35604 100644 --- a/pkg/operator/ceph/cluster/osd/update.go +++ b/pkg/operator/ceph/cluster/osd/update.go @@ -178,7 +178,7 @@ func (c *updateConfig) updateExistingOSDs(errs *provisionErrors) { // when waiting on deployments to be updated, only list OSDs we intend to update specifically by ID listFunc := c.cluster.getFuncToListDeploymentsWithIDs(listIDs) - failures := updateMultipleDeploymentsAndWaitFunc(c.cluster.context.Clientset, updatedDeployments, listFunc) + failures := updateMultipleDeploymentsAndWaitFunc(c.cluster.clusterInfo.Context, c.cluster.context.Clientset, updatedDeployments, listFunc) for _, f := range failures { errs.addError("%v", errors.Wrapf(f.Error, "failed to update OSD deployment %q", f.ResourceName)) } diff --git a/pkg/operator/ceph/cluster/osd/update_test.go b/pkg/operator/ceph/cluster/osd/update_test.go index 7c9a8f86315e..3c7e952a684c 100644 --- a/pkg/operator/ceph/cluster/osd/update_test.go +++ b/pkg/operator/ceph/cluster/osd/update_test.go @@ -132,6 +132,7 @@ func Test_updateExistingOSDs(t *testing.T) { updateMultipleDeploymentsAndWaitFunc = func( + ctx context.Context, clientset kubernetes.Interface, deployments []*appsv1.Deployment, listFunc func() (*appsv1.DeploymentList, error), diff --git a/pkg/operator/ceph/csi/controller.go b/pkg/operator/ceph/csi/controller.go index f442772a4114..4cf7f0727249 100644 --- a/pkg/operator/ceph/csi/controller.go +++ b/pkg/operator/ceph/csi/controller.go @@ -160,7 +160,7 @@ func (r *ReconcileCSI) reconcile(request reconcile.Request) (reconcile.Result, e return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed to get server version") } - ownerRef, err := k8sutil.GetDeploymentOwnerReference(r.context.Clientset, os.Getenv(k8sutil.PodNameEnvVar), r.opConfig.OperatorNamespace) + ownerRef, err := k8sutil.GetDeploymentOwnerReference(r.opManagerContext, r.context.Clientset, os.Getenv(k8sutil.PodNameEnvVar), r.opConfig.OperatorNamespace) if err != nil { logger.Warningf("could not find deployment owner reference to assign to csi drivers. %v", err) } @@ -177,7 +177,7 @@ func (r *ReconcileCSI) reconcile(request reconcile.Request) (reconcile.Result, e return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed creating csi config map") } - err = peermap.CreateOrUpdateConfig(r.context, &peermap.PeerIDMappings{}) + err = peermap.CreateOrUpdateConfig(r.opManagerContext, r.context, &peermap.PeerIDMappings{}) if err != nil { return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed to create pool ID mapping config map") } diff --git a/pkg/operator/ceph/csi/peermap/config.go b/pkg/operator/ceph/csi/peermap/config.go index 927c36dd2816..2f27a459f591 100644 --- a/pkg/operator/ceph/csi/peermap/config.go +++ b/pkg/operator/ceph/csi/peermap/config.go @@ -135,7 +135,7 @@ func toObj(in string) (PeerIDMappings, error) { return mappings, nil } -func ReconcilePoolIDMap(clusterContext *clusterd.Context, clusterInfo *cephclient.ClusterInfo, pool *cephv1.CephBlockPool) error { +func ReconcilePoolIDMap(ctx context.Context, clusterContext *clusterd.Context, clusterInfo *cephclient.ClusterInfo, pool *cephv1.CephBlockPool) error { if pool.Spec.Mirroring.Peers == nil { logger.Infof("no peer secrets added in ceph block pool %q. skipping pool ID mappings with peer cluster", pool.Name) return nil @@ -146,7 +146,7 @@ func ReconcilePoolIDMap(clusterContext *clusterd.Context, clusterInfo *cephclien return errors.Wrapf(err, "failed to get peer pool ID mappings for the pool %q", pool.Name) } - err = CreateOrUpdateConfig(clusterContext, mappings) + err = CreateOrUpdateConfig(ctx, clusterContext, mappings) if err != nil { return errors.Wrapf(err, "failed to create or update peer pool ID mappings configMap for the pool %q", pool.Name) } @@ -236,8 +236,7 @@ func getClusterPoolIDMap(clusterContext *clusterd.Context, clusterInfo *cephclie return mappings, nil } -func CreateOrUpdateConfig(clusterContext *clusterd.Context, mappings *PeerIDMappings) error { - ctx := context.TODO() +func CreateOrUpdateConfig(ctx context.Context, clusterContext *clusterd.Context, mappings *PeerIDMappings) error { data, err := mappings.String() if err != nil { return errors.Wrap(err, "failed to convert peer cluster mappings struct to string") @@ -251,7 +250,7 @@ func CreateOrUpdateConfig(clusterContext *clusterd.Context, mappings *PeerIDMapp if err != nil { if kerrors.IsNotFound(err) { // Create new configMap - return createConfig(clusterContext, request, data) + return createConfig(ctx, clusterContext, request, data) } return errors.Wrapf(err, "failed to get existing mapping config map %q", existingConfigMap.Name) } @@ -295,7 +294,7 @@ func UpdateExistingData(existingMappings, newMappings *PeerIDMappings) (string, return data, nil } -func createConfig(clusterContext *clusterd.Context, request types.NamespacedName, data string) error { +func createConfig(ctx context.Context, clusterContext *clusterd.Context, request types.NamespacedName, data string) error { newConfigMap := &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: request.Name, @@ -308,7 +307,7 @@ func createConfig(clusterContext *clusterd.Context, request types.NamespacedName // Get Operator owner reference operatorPodName := os.Getenv(k8sutil.PodNameEnvVar) - ownerRef, err := k8sutil.GetDeploymentOwnerReference(clusterContext.Clientset, operatorPodName, request.Namespace) + ownerRef, err := k8sutil.GetDeploymentOwnerReference(ctx, clusterContext.Clientset, operatorPodName, request.Namespace) if err != nil { return errors.Wrap(err, "failed to get operator owner reference") } @@ -325,7 +324,7 @@ func createConfig(clusterContext *clusterd.Context, request types.NamespacedName return errors.Wrapf(err, "failed to set owner reference on configMap %q", newConfigMap.Name) } - err = clusterContext.Client.Create(context.TODO(), newConfigMap) + err = clusterContext.Client.Create(ctx, newConfigMap) if err != nil { return errors.Wrapf(err, "failed to create mapping configMap %q", newConfigMap.Name) } diff --git a/pkg/operator/ceph/csi/peermap/config_test.go b/pkg/operator/ceph/csi/peermap/config_test.go index 1878f54b69cc..98f7a36d828d 100644 --- a/pkg/operator/ceph/csi/peermap/config_test.go +++ b/pkg/operator/ceph/csi/peermap/config_test.go @@ -389,7 +389,7 @@ func TestCreateOrUpdateConfig(t *testing.T) { assert.NoError(t, err) // Create empty ID mapping configMap - err = CreateOrUpdateConfig(fakeContext, &PeerIDMappings{}) + err = CreateOrUpdateConfig(context.TODO(), fakeContext, &PeerIDMappings{}) assert.NoError(t, err) validateConfig(t, fakeContext, PeerIDMappings{}) @@ -405,7 +405,7 @@ func TestCreateOrUpdateConfig(t *testing.T) { }, } - err = CreateOrUpdateConfig(fakeContext, actualMappings) + err = CreateOrUpdateConfig(context.TODO(), fakeContext, actualMappings) assert.NoError(t, err) //validateConfig(t, fakeContext, actualMappings) @@ -420,7 +420,7 @@ func TestCreateOrUpdateConfig(t *testing.T) { }, }) - err = CreateOrUpdateConfig(fakeContext, &mappings) + err = CreateOrUpdateConfig(context.TODO(), fakeContext, &mappings) assert.NoError(t, err) validateConfig(t, fakeContext, mappings) } diff --git a/pkg/operator/ceph/csi/spec.go b/pkg/operator/ceph/csi/spec.go index c7ef6591160d..581dfbcc3b7f 100644 --- a/pkg/operator/ceph/csi/spec.go +++ b/pkg/operator/ceph/csi/spec.go @@ -439,7 +439,7 @@ func (r *ReconcileCSI) startDrivers(ver *version.Info, ownerInfo *k8sutil.OwnerI if err != nil { return errors.Wrapf(err, "failed to apply network config to rbd plugin provisioner deployment %q", rbdProvisionerDeployment.Name) } - _, err = k8sutil.CreateOrUpdateDeployment(r.context.Clientset, rbdProvisionerDeployment) + _, err = k8sutil.CreateOrUpdateDeployment(r.opManagerContext, r.context.Clientset, rbdProvisionerDeployment) if err != nil { return errors.Wrapf(err, "failed to start rbd provisioner deployment %q", rbdProvisionerDeployment.Name) } @@ -508,7 +508,7 @@ func (r *ReconcileCSI) startDrivers(ver *version.Info, ownerInfo *k8sutil.OwnerI if err != nil { return errors.Wrapf(err, "failed to apply network config to cephfs plugin provisioner deployment %q", cephfsProvisionerDeployment.Name) } - _, err = k8sutil.CreateOrUpdateDeployment(r.context.Clientset, cephfsProvisionerDeployment) + _, err = k8sutil.CreateOrUpdateDeployment(r.opManagerContext, r.context.Clientset, cephfsProvisionerDeployment) if err != nil { return errors.Wrapf(err, "failed to start cephfs provisioner deployment %q", cephfsProvisionerDeployment.Name) } @@ -576,7 +576,7 @@ func (r *ReconcileCSI) deleteCSIDriverResources(ver *version.Info, daemonset, de succeeded = false } - err = k8sutil.DeleteDeployment(r.context.Clientset, r.opConfig.OperatorNamespace, deployment) + err = k8sutil.DeleteDeployment(r.opManagerContext, r.context.Clientset, r.opConfig.OperatorNamespace, deployment) if err != nil { logger.Errorf("failed to delete the %q. %v", deployment, err) succeeded = false diff --git a/pkg/operator/ceph/file/mds/mds.go b/pkg/operator/ceph/file/mds/mds.go index 4fb36da530a3..546afe68530e 100644 --- a/pkg/operator/ceph/file/mds/mds.go +++ b/pkg/operator/ceph/file/mds/mds.go @@ -298,7 +298,7 @@ func (c *Cluster) upgradeMDS() error { func (c *Cluster) scaleDownDeployments(replicas int32, activeCount int32, desiredDeployments map[string]bool, delete bool) error { // Remove extraneous mds deployments if they exist - deps, err := getMdsDeployments(c.context, c.fs.Namespace, c.fs.Name) + deps, err := getMdsDeployments(c.clusterInfo.Context, c.context, c.fs.Namespace, c.fs.Name) if err != nil { return errors.Wrapf(err, fmt.Sprintf("cannot verify the removal of extraneous mds deployments for filesystem %s. ", c.fs.Name)+ @@ -331,13 +331,13 @@ func (c *Cluster) scaleDownDeployments(replicas int32, activeCount int32, desire localdeployment := d if !delete { // stop mds daemon only by scaling deployment replicas to 0 - if err := scaleMdsDeployment(c.context, c.fs.Namespace, &localdeployment, 0); err != nil { + if err := scaleMdsDeployment(c.clusterInfo.Context, c.context, c.fs.Namespace, &localdeployment, 0); err != nil { errCount++ logger.Errorf("failed to scale mds deployment %q. %v", localdeployment.GetName(), err) } continue } - if err := deleteMdsDeployment(c.context, c.fs.Namespace, &localdeployment); err != nil { + if err := deleteMdsDeployment(c.clusterInfo.Context, c.context, c.fs.Namespace, &localdeployment); err != nil { errCount++ logger.Errorf("failed to delete mds deployment. %v", err) } diff --git a/pkg/operator/ceph/file/mds/spec.go b/pkg/operator/ceph/file/mds/spec.go index 56d1466608e0..aa9f17abb6c5 100644 --- a/pkg/operator/ceph/file/mds/spec.go +++ b/pkg/operator/ceph/file/mds/spec.go @@ -160,17 +160,16 @@ func (c *Cluster) podLabels(mdsConfig *mdsConfig, includeNewLabels bool) map[str return labels } -func getMdsDeployments(context *clusterd.Context, namespace, fsName string) (*apps.DeploymentList, error) { +func getMdsDeployments(ctx context.Context, context *clusterd.Context, namespace, fsName string) (*apps.DeploymentList, error) { fsLabelSelector := fmt.Sprintf("rook_file_system=%s", fsName) - deps, err := k8sutil.GetDeployments(context.Clientset, namespace, fsLabelSelector) + deps, err := k8sutil.GetDeployments(ctx, context.Clientset, namespace, fsLabelSelector) if err != nil { return nil, errors.Wrapf(err, "could not get deployments for filesystem %s (matching label selector %q)", fsName, fsLabelSelector) } return deps, nil } -func deleteMdsDeployment(clusterdContext *clusterd.Context, namespace string, deployment *apps.Deployment) error { - ctx := context.TODO() +func deleteMdsDeployment(ctx context.Context, clusterdContext *clusterd.Context, namespace string, deployment *apps.Deployment) error { // Delete the mds deployment logger.Infof("deleting mds deployment %s", deployment.Name) var gracePeriod int64 @@ -182,8 +181,7 @@ func deleteMdsDeployment(clusterdContext *clusterd.Context, namespace string, de return nil } -func scaleMdsDeployment(clusterdContext *clusterd.Context, namespace string, deployment *apps.Deployment, replicas int32) error { - ctx := context.TODO() +func scaleMdsDeployment(ctx context.Context, clusterdContext *clusterd.Context, namespace string, deployment *apps.Deployment, replicas int32) error { // scale mds deployment logger.Infof("scaling mds deployment %q to %d replicas", deployment.Name, replicas) d, err := clusterdContext.Clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.GetName(), metav1.GetOptions{}) diff --git a/pkg/operator/ceph/object/rgw.go b/pkg/operator/ceph/object/rgw.go index 2572b0a9980e..6cb50cb03802 100644 --- a/pkg/operator/ceph/object/rgw.go +++ b/pkg/operator/ceph/object/rgw.go @@ -174,7 +174,7 @@ func (c *clusterConfig) startRGWPods(realmName, zoneGroupName, zoneName string) } // scale down scenario - deps, err := k8sutil.GetDeployments(c.context.Clientset, c.store.Namespace, c.storeLabelSelector()) + deps, err := k8sutil.GetDeployments(c.clusterInfo.Context, c.context.Clientset, c.store.Namespace, c.storeLabelSelector()) if err != nil { logger.Warningf("could not get deployments for object store %q (matching label selector %q). %v", c.store.Name, c.storeLabelSelector(), err) } @@ -186,7 +186,7 @@ func (c *clusterConfig) startRGWPods(realmName, zoneGroupName, zoneName string) for i := 0; i < diffCount; { depIDToRemove := currentRgwInstances - 1 depNameToRemove := fmt.Sprintf("%s-%s-%s", AppName, c.store.Name, k8sutil.IndexToName(depIDToRemove)) - if err := k8sutil.DeleteDeployment(c.context.Clientset, c.store.Namespace, depNameToRemove); err != nil { + if err := k8sutil.DeleteDeployment(c.clusterInfo.Context, c.context.Clientset, c.store.Namespace, depNameToRemove); err != nil { logger.Warningf("error during deletion of deployment %q resource. %v", depNameToRemove, err) } currentRgwInstances = currentRgwInstances - 1 @@ -205,7 +205,7 @@ func (c *clusterConfig) startRGWPods(realmName, zoneGroupName, zoneName string) } } // verify scale down was successful - deps, err = k8sutil.GetDeployments(c.context.Clientset, c.store.Namespace, c.storeLabelSelector()) + deps, err = k8sutil.GetDeployments(c.clusterInfo.Context, c.context.Clientset, c.store.Namespace, c.storeLabelSelector()) if err != nil { logger.Warningf("could not get deployments for object store %q (matching label selector %q). %v", c.store.Name, c.storeLabelSelector(), err) } diff --git a/pkg/operator/ceph/pool/controller.go b/pkg/operator/ceph/pool/controller.go index 154a21638398..0cb9cfcdc6cb 100644 --- a/pkg/operator/ceph/pool/controller.go +++ b/pkg/operator/ceph/pool/controller.go @@ -299,7 +299,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile } // ReconcilePoolIDMap updates the `rook-ceph-csi-mapping-config` with local and peer cluster pool ID map - err = peermap.ReconcilePoolIDMap(r.context, r.clusterInfo, cephBlockPool) + err = peermap.ReconcilePoolIDMap(r.opManagerContext, r.context, r.clusterInfo, cephBlockPool) if err != nil { return reconcileResponse, errors.Wrapf(err, "failed to update pool ID mapping config for the pool %q", cephBlockPool.Name) } diff --git a/pkg/operator/k8sutil/deployment.go b/pkg/operator/k8sutil/deployment.go index 34d62c0bb4c5..bb4462758c38 100644 --- a/pkg/operator/k8sutil/deployment.go +++ b/pkg/operator/k8sutil/deployment.go @@ -39,8 +39,7 @@ var ( ) // GetDeploymentImage returns the version of the image running in the pod spec for the desired container -func GetDeploymentImage(clientset kubernetes.Interface, namespace, name, container string) (string, error) { - ctx := context.TODO() +func GetDeploymentImage(ctx context.Context, clientset kubernetes.Interface, namespace, name, container string) (string, error) { d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("failed to find deployment %s. %v", name, err) @@ -66,8 +65,7 @@ func GetDeploymentSpecImage(clientset kubernetes.Interface, d appsv1.Deployment, // 2. verify that we can continue the update procedure // Basically, we go one resource by one and check if we can stop and then if the resource has been successfully updated // we check if we can go ahead and move to the next one. -func UpdateDeploymentAndWait(clusterContext *clusterd.Context, modifiedDeployment *appsv1.Deployment, namespace string, verifyCallback func(action string) error) error { - ctx := context.TODO() +func UpdateDeploymentAndWait(ctx context.Context, clusterContext *clusterd.Context, modifiedDeployment *appsv1.Deployment, namespace string, verifyCallback func(action string) error) error { currentDeployment, err := clusterContext.Clientset.AppsV1().Deployments(namespace).Get(ctx, modifiedDeployment.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get deployment %s. %+v", modifiedDeployment.Name, err) @@ -105,7 +103,7 @@ func UpdateDeploymentAndWait(clusterContext *clusterd.Context, modifiedDeploymen return fmt.Errorf("failed to update deployment %q. %v", modifiedDeployment.Name, err) } - if err := WaitForDeploymentToStart(clusterContext, currentDeployment); err != nil { + if err := WaitForDeploymentToStart(ctx, clusterContext, currentDeployment); err != nil { return err } @@ -116,13 +114,13 @@ func UpdateDeploymentAndWait(clusterContext *clusterd.Context, modifiedDeploymen return nil } -func WaitForDeploymentToStart(clusterdContext *clusterd.Context, deployment *appsv1.Deployment) error { +func WaitForDeploymentToStart(ctx context.Context, clusterdContext *clusterd.Context, deployment *appsv1.Deployment) error { // wait for the deployment to be restarted up to 300s sleepTime := 3 attempts := 100 for i := 0; i < attempts; i++ { // check for the status of the deployment - d, err := clusterdContext.Clientset.AppsV1().Deployments(deployment.Namespace).Get(context.TODO(), deployment.Name, metav1.GetOptions{}) + d, err := clusterdContext.Clientset.AppsV1().Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get deployment %q. %v", deployment.Name, err) } @@ -188,6 +186,7 @@ func (failures *Failures) CollatedErrors() error { // Also returns a list of failures. Each failure returned includes the name of the deployment which // could not be updated and the error experienced when attempting to update the deployment. func UpdateMultipleDeployments( + ctx context.Context, clientset kubernetes.Interface, deployments []*appsv1.Deployment, ) (DeploymentsUpdated, Failures, *int32) { @@ -196,7 +195,7 @@ func UpdateMultipleDeployments( var maxProgressDeadlineSeconds *int32 for _, dep := range deployments { - oldDep, newDep, err := updateDeployment(clientset, dep) + oldDep, newDep, err := updateDeployment(ctx, clientset, dep) if err != nil { failures = append(failures, Failure{ ResourceName: dep.Name, @@ -302,11 +301,12 @@ func WaitForDeploymentsToUpdate( } func UpdateMultipleDeploymentsAndWait( + ctx context.Context, clientset kubernetes.Interface, deployments []*appsv1.Deployment, listFunc func() (*appsv1.DeploymentList, error), ) Failures { - depsUpdated, updateFailures, maxProgressDeadline := UpdateMultipleDeployments(clientset, deployments) + depsUpdated, updateFailures, maxProgressDeadline := UpdateMultipleDeployments(ctx, clientset, deployments) waitFailures := WaitForDeploymentsToUpdate(depsUpdated, maxProgressDeadline, listFunc) return append(updateFailures, waitFailures...) @@ -327,10 +327,10 @@ func progressDeadlineExceeded(d *appsv1.Deployment) error { } func updateDeployment( + ctx context.Context, clientset kubernetes.Interface, deployment *appsv1.Deployment, ) (oldDeployment, newDeployment *appsv1.Deployment, err error) { - ctx := context.TODO() namespace := deployment.Namespace oldDeployment, err = clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) @@ -370,9 +370,8 @@ func updateDeployment( // GetDeployments returns a list of deployment names labels matching a given selector // example of a label selector might be "app=rook-ceph-mon, mon!=b" // more: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ -func GetDeployments(clientset kubernetes.Interface, namespace, labelSelector string) (*appsv1.DeploymentList, error) { +func GetDeployments(ctx context.Context, clientset kubernetes.Interface, namespace, labelSelector string) (*appsv1.DeploymentList, error) { listOptions := metav1.ListOptions{LabelSelector: labelSelector} - ctx := context.TODO() deployments, err := clientset.AppsV1().Deployments(namespace).List(ctx, listOptions) if err != nil { return nil, fmt.Errorf("failed to list deployments with labelSelector %s: %v", labelSelector, err) @@ -381,9 +380,8 @@ func GetDeployments(clientset kubernetes.Interface, namespace, labelSelector str } // DeleteDeployment makes a best effort at deleting a deployment and its pods, then waits for them to be deleted -func DeleteDeployment(clientset kubernetes.Interface, namespace, name string) error { +func DeleteDeployment(ctx context.Context, clientset kubernetes.Interface, namespace, name string) error { logger.Debugf("removing %s deployment if it exists", name) - ctx := context.TODO() deleteAction := func(options *metav1.DeleteOptions) error { return clientset.AppsV1().Deployments(namespace).Delete(ctx, name, *options) } @@ -395,8 +393,7 @@ func DeleteDeployment(clientset kubernetes.Interface, namespace, name string) er } // GetDeploymentOwnerReference returns an OwnerReference to the deployment that is running the given pod name -func GetDeploymentOwnerReference(clientset kubernetes.Interface, podName, namespace string) (*metav1.OwnerReference, error) { - ctx := context.TODO() +func GetDeploymentOwnerReference(ctx context.Context, clientset kubernetes.Interface, podName, namespace string) (*metav1.OwnerReference, error) { var deploymentRef *metav1.OwnerReference pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { @@ -424,8 +421,7 @@ func GetDeploymentOwnerReference(clientset kubernetes.Interface, podName, namesp // WaitForDeploymentImage waits for all deployments with the given labels are running. // WARNING:This is currently only useful for testing! -func WaitForDeploymentImage(clientset kubernetes.Interface, namespace, label, container string, initContainer bool, desiredImage string) error { - ctx := context.TODO() +func WaitForDeploymentImage(ctx context.Context, clientset kubernetes.Interface, namespace, label, container string, initContainer bool, desiredImage string) error { sleepTime := 3 attempts := 120 for i := 0; i < attempts; i++ { @@ -515,9 +511,7 @@ func addLabel(key, value string, labels map[string]string) { } // CreateDeployment creates a deployment with a last applied hash annotation added -func CreateDeployment(clientset kubernetes.Interface, dep *appsv1.Deployment) (*appsv1.Deployment, error) { - ctx := context.TODO() - +func CreateDeployment(ctx context.Context, clientset kubernetes.Interface, dep *appsv1.Deployment) (*appsv1.Deployment, error) { // Set hash annotation to the newly generated deployment err := patch.DefaultAnnotator.SetLastAppliedAnnotation(dep) if err != nil { @@ -527,10 +521,8 @@ func CreateDeployment(clientset kubernetes.Interface, dep *appsv1.Deployment) (* return clientset.AppsV1().Deployments(dep.Namespace).Create(ctx, dep, metav1.CreateOptions{}) } -func CreateOrUpdateDeployment(clientset kubernetes.Interface, dep *appsv1.Deployment) (*appsv1.Deployment, error) { - ctx := context.TODO() - - newDep, err := CreateDeployment(clientset, dep) +func CreateOrUpdateDeployment(ctx context.Context, clientset kubernetes.Interface, dep *appsv1.Deployment) (*appsv1.Deployment, error) { + newDep, err := CreateDeployment(ctx, clientset, dep) if err != nil { if k8serrors.IsAlreadyExists(err) { // annotation was added in CreateDeployment to dep passed by reference diff --git a/pkg/operator/k8sutil/deployment_test.go b/pkg/operator/k8sutil/deployment_test.go index 0907d8ec7a05..e604b99ef9fb 100644 --- a/pkg/operator/k8sutil/deployment_test.go +++ b/pkg/operator/k8sutil/deployment_test.go @@ -146,7 +146,7 @@ func TestUpdateMultipleDeploymentsAndWait(t *testing.T) { timesCalled++ return l, nil } - failures = UpdateMultipleDeploymentsAndWait(clientset, deployments, listFunc) + failures = UpdateMultipleDeploymentsAndWait(context.TODO(), clientset, deployments, listFunc) assert.Len(t, failures, 3) assert.ElementsMatch(t, []string{failures[0].ResourceName, failures[1].ResourceName, failures[2].ResourceName}, @@ -191,7 +191,7 @@ func TestUpdateMultipleDeployments(t *testing.T) { t.Run("no deployments to be updated", func(t *testing.T) { clientset = fake.NewSimpleClientset() deployments = []*appsv1.Deployment{} - deploymentsUpdated, failures, pds = UpdateMultipleDeployments(clientset, deployments) + deploymentsUpdated, failures, pds = UpdateMultipleDeployments(context.TODO(), clientset, deployments) assert.Len(t, deploymentsUpdated, 0) assert.Len(t, failures, 0) }) @@ -206,7 +206,7 @@ func TestUpdateMultipleDeployments(t *testing.T) { modifiedDeployment("d2", nil), modifiedDeployment("d3", nil), } - deploymentsUpdated, failures, pds = UpdateMultipleDeployments(clientset, deployments) + deploymentsUpdated, failures, pds = UpdateMultipleDeployments(context.TODO(), clientset, deployments) assert.Len(t, deploymentsUpdated, 3) assert.Len(t, failures, 0) assert.Contains(t, deploymentsUpdated, "d1") @@ -226,7 +226,7 @@ func TestUpdateMultipleDeployments(t *testing.T) { // d2 from before should also not be updated d3, // should be updated } - deploymentsUpdated, failures, pds = UpdateMultipleDeployments(clientset, deployments) + deploymentsUpdated, failures, pds = UpdateMultipleDeployments(context.TODO(), clientset, deployments) assert.Len(t, deploymentsUpdated, 1) assert.Len(t, failures, 0) assert.Contains(t, deploymentsUpdated, "d3") @@ -243,7 +243,7 @@ func TestUpdateMultipleDeployments(t *testing.T) { modifiedDeployment("d3", newInt32(30)), modifiedDeployment("d4", newInt32(30)), } - deploymentsUpdated, failures, pds = UpdateMultipleDeployments(clientset, deployments) + deploymentsUpdated, failures, pds = UpdateMultipleDeployments(context.TODO(), clientset, deployments) assert.Len(t, deploymentsUpdated, 2) assert.Len(t, failures, 2) assert.Contains(t, deploymentsUpdated, "d1") diff --git a/tests/integration/ceph_upgrade_test.go b/tests/integration/ceph_upgrade_test.go index 6ba70cd197cf..734d834a0e45 100644 --- a/tests/integration/ceph_upgrade_test.go +++ b/tests/integration/ceph_upgrade_test.go @@ -17,6 +17,7 @@ limitations under the License. package integration import ( + "context" "fmt" "strings" "testing" @@ -272,7 +273,7 @@ func (s *UpgradeSuite) deployClusterforUpgrade(objectStoreName, objectUserID, me cephfsFilesToRead := []string{} // Get some info about the currently deployed OSDs to determine later if they are all updated - osdDepList, err := k8sutil.GetDeployments(s.k8sh.Clientset, s.namespace, "app=rook-ceph-osd") + osdDepList, err := k8sutil.GetDeployments(context.TODO(), s.k8sh.Clientset, s.namespace, "app=rook-ceph-osd") require.NoError(s.T(), err) osdDeps := osdDepList.Items numOSDs := len(osdDeps) // there should be this many upgraded OSDs @@ -291,7 +292,7 @@ func (s *UpgradeSuite) gatherLogs(systemNamespace, testSuffix string) { } func (s *UpgradeSuite) upgradeCephVersion(newCephImage string, numOSDs int) { - osdDepList, err := k8sutil.GetDeployments(s.k8sh.Clientset, s.namespace, "app=rook-ceph-osd") + osdDepList, err := k8sutil.GetDeployments(context.TODO(), s.k8sh.Clientset, s.namespace, "app=rook-ceph-osd") require.NoError(s.T(), err) oldCephVersion := osdDepList.Items[0].Labels["ceph-version"] // upgraded OSDs should not have this version label @@ -306,24 +307,24 @@ func (s *UpgradeSuite) verifyOperatorImage(expectedImage string) { systemNamespace := installer.SystemNamespace(s.namespace) // verify that the operator spec is updated - version, err := k8sutil.GetDeploymentImage(s.k8sh.Clientset, systemNamespace, operatorContainer, operatorContainer) + version, err := k8sutil.GetDeploymentImage(context.TODO(), s.k8sh.Clientset, systemNamespace, operatorContainer, operatorContainer) assert.NoError(s.T(), err) assert.Equal(s.T(), "rook/ceph:"+expectedImage, version) } func (s *UpgradeSuite) verifyRookUpgrade(numOSDs int) { // Get some info about the currently deployed mons to determine later if they are all updated - monDepList, err := k8sutil.GetDeployments(s.k8sh.Clientset, s.namespace, "app=rook-ceph-mon") + monDepList, err := k8sutil.GetDeployments(context.TODO(), s.k8sh.Clientset, s.namespace, "app=rook-ceph-mon") require.NoError(s.T(), err) require.Equal(s.T(), s.settings.Mons, len(monDepList.Items), monDepList.Items) // Get some info about the currently deployed mgr to determine later if it is updated - mgrDepList, err := k8sutil.GetDeployments(s.k8sh.Clientset, s.namespace, "app=rook-ceph-mgr") + mgrDepList, err := k8sutil.GetDeployments(context.TODO(), s.k8sh.Clientset, s.namespace, "app=rook-ceph-mgr") require.NoError(s.T(), err) require.Equal(s.T(), 1, len(mgrDepList.Items)) // Get some info about the currently deployed OSDs to determine later if they are all updated - osdDepList, err := k8sutil.GetDeployments(s.k8sh.Clientset, s.namespace, "app=rook-ceph-osd") + osdDepList, err := k8sutil.GetDeployments(context.TODO(), s.k8sh.Clientset, s.namespace, "app=rook-ceph-osd") require.NoError(s.T(), err) require.NotZero(s.T(), len(osdDepList.Items)) require.Equal(s.T(), numOSDs, len(osdDepList.Items), osdDepList.Items)