Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: add context parameter to k8sutil deployment #9160

Merged
merged 1 commit into from Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/daemon/ceph/osd/remove.go
Expand Up @@ -87,7 +87,7 @@ func removeOSD(clusterdContext *clusterd.Context, clusterInfo *client.ClusterInf
logger.Errorf("failed to fetch the deployment %q. %v", deploymentName, err)
} else {
logger.Infof("removing the OSD deployment %q", deploymentName)
if err := k8sutil.DeleteDeployment(clusterdContext.Clientset, clusterInfo.Namespace, deploymentName); err != nil {
if err := k8sutil.DeleteDeployment(clusterInfo.Context, clusterdContext.Clientset, clusterInfo.Namespace, deploymentName); err != nil {
if err != nil {
// Continue purging the OSD even if the deployment fails to be deleted
logger.Errorf("failed to delete deployment for OSD %d. %v", osdID, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/mgr.go
Expand Up @@ -170,7 +170,7 @@ func (c *Cluster) Start() error {
// If the mgr is newly created, wait for it to start before continuing with the service and
// module configuration
for _, d := range deploymentsToWaitFor {
if err := waitForDeploymentToStart(c.context, d); err != nil {
if err := waitForDeploymentToStart(c.clusterInfo.Context, c.context, d); err != nil {
return errors.Wrapf(err, "failed to wait for mgr %q to start", d.Name)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/mgr_test.go
Expand Up @@ -57,7 +57,7 @@ func TestStartMgr(t *testing.T) {
return "{\"key\":\"mysecurekey\"}", nil
},
}
waitForDeploymentToStart = func(clusterdContext *clusterd.Context, deployment *apps.Deployment) error {
waitForDeploymentToStart = func(ctx context.Context, clusterdContext *clusterd.Context, deployment *apps.Deployment) error {
logger.Infof("simulated mgr deployment starting")
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/mon/mon.go
Expand Up @@ -673,7 +673,7 @@ func scheduleMonitor(c *Cluster, mon *monConfig) (*apps.Deployment, error) {
logger.Infof("created canary deployment %s", d.Name)
break
} else if kerrors.IsAlreadyExists(err) {
if err := k8sutil.DeleteDeployment(c.context.Clientset, c.Namespace, d.Name); err != nil {
if err := k8sutil.DeleteDeployment(c.ClusterInfo.Context, c.context.Clientset, c.Namespace, d.Name); err != nil {
return nil, errors.Wrapf(err, "failed to delete canary deployment %s", d.Name)
}
logger.Infof("deleted existing canary deployment %s", d.Name)
Expand Down Expand Up @@ -782,7 +782,7 @@ func (c *Cluster) initMonIPs(mons []*monConfig) error {
// Delete mon canary deployments (and associated PVCs) using deployment labels
// to select this kind of temporary deployments
func (c *Cluster) removeCanaryDeployments() {
canaryDeployments, err := k8sutil.GetDeployments(c.context.Clientset, c.Namespace, "app=rook-ceph-mon,mon_canary=true")
canaryDeployments, err := k8sutil.GetDeployments(c.ClusterInfo.Context, c.context.Clientset, c.Namespace, "app=rook-ceph-mon,mon_canary=true")
if err != nil {
logger.Warningf("failed to get the list of monitor canary deployments. %v", err)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mon/spec.go
Expand Up @@ -380,6 +380,6 @@ func UpdateCephDeploymentAndWait(context *clusterd.Context, clusterInfo *client.
return nil
}

err := k8sutil.UpdateDeploymentAndWait(context, deployment, clusterInfo.Namespace, callback)
err := k8sutil.UpdateDeploymentAndWait(clusterInfo.Context, context, deployment, clusterInfo.Namespace, callback)
return err
}
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/osd/create.go
Expand Up @@ -383,7 +383,7 @@ func createDaemonOnPVC(c *Cluster, osd OSDInfo, pvcName string, config *provisio
message := fmt.Sprintf("Processing OSD %d on PVC %q", osd.ID, pvcName)
updateConditionFunc(c.clusterInfo.Context, c.context, c.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)

_, err = k8sutil.CreateDeployment(c.context.Clientset, d)
_, err = k8sutil.CreateDeployment(c.clusterInfo.Context, c.context.Clientset, d)
return errors.Wrapf(err, "failed to create deployment for OSD %d on PVC %q", osd.ID, pvcName)
}

Expand All @@ -396,6 +396,6 @@ func createDaemonOnNode(c *Cluster, osd OSDInfo, nodeName string, config *provis
message := fmt.Sprintf("Processing OSD %d on node %q", osd.ID, nodeName)
updateConditionFunc(c.clusterInfo.Context, c.context, c.clusterInfo.NamespacedName(), cephv1.ConditionProgressing, v1.ConditionTrue, cephv1.ClusterProgressingReason, message)

_, err = k8sutil.CreateDeployment(c.context.Clientset, d)
_, err = k8sutil.CreateDeployment(c.clusterInfo.Context, c.context.Clientset, d)
return errors.Wrapf(err, "failed to create deployment for OSD %d on node %q", osd.ID, nodeName)
}
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/osd/health.go
Expand Up @@ -156,7 +156,7 @@ func (m *OSDHealthMonitor) checkOSDDump() error {

func (m *OSDHealthMonitor) removeOSDDeploymentIfSafeToDestroy(outOSDid int) error {
label := fmt.Sprintf("ceph-osd-id=%d", outOSDid)
dp, err := k8sutil.GetDeployments(m.context.Clientset, m.clusterInfo.Namespace, label)
dp, err := k8sutil.GetDeployments(m.clusterInfo.Context, m.context.Clientset, m.clusterInfo.Namespace, label)
if err != nil {
if kerrors.IsNotFound(err) {
return nil
Expand All @@ -175,7 +175,7 @@ func (m *OSDHealthMonitor) removeOSDDeploymentIfSafeToDestroy(outOSDid int) erro
currentTime := time.Now().UTC()
if podDeletionTimeStamp.Before(currentTime) {
logger.Infof("osd.%d is 'safe-to-destroy'. removing the osd deployment.", outOSDid)
if err := k8sutil.DeleteDeployment(m.context.Clientset, dp.Items[0].Namespace, dp.Items[0].Name); err != nil {
if err := k8sutil.DeleteDeployment(m.clusterInfo.Context, m.context.Clientset, dp.Items[0].Namespace, dp.Items[0].Name); err != nil {
return errors.Wrapf(err, "failed to delete osd deployment %s", dp.Items[0].Name)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/update.go
Expand Up @@ -178,7 +178,7 @@ func (c *updateConfig) updateExistingOSDs(errs *provisionErrors) {
// when waiting on deployments to be updated, only list OSDs we intend to update specifically by ID
listFunc := c.cluster.getFuncToListDeploymentsWithIDs(listIDs)

failures := updateMultipleDeploymentsAndWaitFunc(c.cluster.context.Clientset, updatedDeployments, listFunc)
failures := updateMultipleDeploymentsAndWaitFunc(c.cluster.clusterInfo.Context, c.cluster.context.Clientset, updatedDeployments, listFunc)
for _, f := range failures {
errs.addError("%v", errors.Wrapf(f.Error, "failed to update OSD deployment %q", f.ResourceName))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/cluster/osd/update_test.go
Expand Up @@ -132,6 +132,7 @@ func Test_updateExistingOSDs(t *testing.T) {

updateMultipleDeploymentsAndWaitFunc =
func(
ctx context.Context,
clientset kubernetes.Interface,
deployments []*appsv1.Deployment,
listFunc func() (*appsv1.DeploymentList, error),
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/csi/controller.go
Expand Up @@ -160,7 +160,7 @@ func (r *ReconcileCSI) reconcile(request reconcile.Request) (reconcile.Result, e
return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed to get server version")
}

ownerRef, err := k8sutil.GetDeploymentOwnerReference(r.context.Clientset, os.Getenv(k8sutil.PodNameEnvVar), r.opConfig.OperatorNamespace)
ownerRef, err := k8sutil.GetDeploymentOwnerReference(r.opManagerContext, r.context.Clientset, os.Getenv(k8sutil.PodNameEnvVar), r.opConfig.OperatorNamespace)
if err != nil {
logger.Warningf("could not find deployment owner reference to assign to csi drivers. %v", err)
}
Expand All @@ -177,7 +177,7 @@ func (r *ReconcileCSI) reconcile(request reconcile.Request) (reconcile.Result, e
return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed creating csi config map")
}

err = peermap.CreateOrUpdateConfig(r.context, &peermap.PeerIDMappings{})
err = peermap.CreateOrUpdateConfig(r.opManagerContext, r.context, &peermap.PeerIDMappings{})
if err != nil {
return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed to create pool ID mapping config map")
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/operator/ceph/csi/peermap/config.go
Expand Up @@ -135,7 +135,7 @@ func toObj(in string) (PeerIDMappings, error) {
return mappings, nil
}

func ReconcilePoolIDMap(clusterContext *clusterd.Context, clusterInfo *cephclient.ClusterInfo, pool *cephv1.CephBlockPool) error {
func ReconcilePoolIDMap(ctx context.Context, clusterContext *clusterd.Context, clusterInfo *cephclient.ClusterInfo, pool *cephv1.CephBlockPool) error {
if pool.Spec.Mirroring.Peers == nil {
logger.Infof("no peer secrets added in ceph block pool %q. skipping pool ID mappings with peer cluster", pool.Name)
return nil
Expand All @@ -146,7 +146,7 @@ func ReconcilePoolIDMap(clusterContext *clusterd.Context, clusterInfo *cephclien
return errors.Wrapf(err, "failed to get peer pool ID mappings for the pool %q", pool.Name)
}

err = CreateOrUpdateConfig(clusterContext, mappings)
err = CreateOrUpdateConfig(ctx, clusterContext, mappings)
if err != nil {
return errors.Wrapf(err, "failed to create or update peer pool ID mappings configMap for the pool %q", pool.Name)
}
Expand Down Expand Up @@ -236,8 +236,7 @@ func getClusterPoolIDMap(clusterContext *clusterd.Context, clusterInfo *cephclie
return mappings, nil
}

func CreateOrUpdateConfig(clusterContext *clusterd.Context, mappings *PeerIDMappings) error {
ctx := context.TODO()
func CreateOrUpdateConfig(ctx context.Context, clusterContext *clusterd.Context, mappings *PeerIDMappings) error {
data, err := mappings.String()
if err != nil {
return errors.Wrap(err, "failed to convert peer cluster mappings struct to string")
Expand All @@ -251,7 +250,7 @@ func CreateOrUpdateConfig(clusterContext *clusterd.Context, mappings *PeerIDMapp
if err != nil {
if kerrors.IsNotFound(err) {
// Create new configMap
return createConfig(clusterContext, request, data)
return createConfig(ctx, clusterContext, request, data)
}
return errors.Wrapf(err, "failed to get existing mapping config map %q", existingConfigMap.Name)
}
Expand Down Expand Up @@ -295,7 +294,7 @@ func UpdateExistingData(existingMappings, newMappings *PeerIDMappings) (string,
return data, nil
}

func createConfig(clusterContext *clusterd.Context, request types.NamespacedName, data string) error {
func createConfig(ctx context.Context, clusterContext *clusterd.Context, request types.NamespacedName, data string) error {
newConfigMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: request.Name,
Expand All @@ -308,7 +307,7 @@ func createConfig(clusterContext *clusterd.Context, request types.NamespacedName

// Get Operator owner reference
operatorPodName := os.Getenv(k8sutil.PodNameEnvVar)
ownerRef, err := k8sutil.GetDeploymentOwnerReference(clusterContext.Clientset, operatorPodName, request.Namespace)
ownerRef, err := k8sutil.GetDeploymentOwnerReference(ctx, clusterContext.Clientset, operatorPodName, request.Namespace)
if err != nil {
return errors.Wrap(err, "failed to get operator owner reference")
}
Expand All @@ -325,7 +324,7 @@ func createConfig(clusterContext *clusterd.Context, request types.NamespacedName
return errors.Wrapf(err, "failed to set owner reference on configMap %q", newConfigMap.Name)
}

err = clusterContext.Client.Create(context.TODO(), newConfigMap)
err = clusterContext.Client.Create(ctx, newConfigMap)
if err != nil {
return errors.Wrapf(err, "failed to create mapping configMap %q", newConfigMap.Name)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/csi/peermap/config_test.go
Expand Up @@ -389,7 +389,7 @@ func TestCreateOrUpdateConfig(t *testing.T) {
assert.NoError(t, err)

// Create empty ID mapping configMap
err = CreateOrUpdateConfig(fakeContext, &PeerIDMappings{})
err = CreateOrUpdateConfig(context.TODO(), fakeContext, &PeerIDMappings{})
assert.NoError(t, err)
validateConfig(t, fakeContext, PeerIDMappings{})

Expand All @@ -405,7 +405,7 @@ func TestCreateOrUpdateConfig(t *testing.T) {
},
}

err = CreateOrUpdateConfig(fakeContext, actualMappings)
err = CreateOrUpdateConfig(context.TODO(), fakeContext, actualMappings)
assert.NoError(t, err)
//validateConfig(t, fakeContext, actualMappings)

Expand All @@ -420,7 +420,7 @@ func TestCreateOrUpdateConfig(t *testing.T) {
},
})

err = CreateOrUpdateConfig(fakeContext, &mappings)
err = CreateOrUpdateConfig(context.TODO(), fakeContext, &mappings)
assert.NoError(t, err)
validateConfig(t, fakeContext, mappings)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/csi/spec.go
Expand Up @@ -439,7 +439,7 @@ func (r *ReconcileCSI) startDrivers(ver *version.Info, ownerInfo *k8sutil.OwnerI
if err != nil {
return errors.Wrapf(err, "failed to apply network config to rbd plugin provisioner deployment %q", rbdProvisionerDeployment.Name)
}
_, err = k8sutil.CreateOrUpdateDeployment(r.context.Clientset, rbdProvisionerDeployment)
_, err = k8sutil.CreateOrUpdateDeployment(r.opManagerContext, r.context.Clientset, rbdProvisionerDeployment)
if err != nil {
return errors.Wrapf(err, "failed to start rbd provisioner deployment %q", rbdProvisionerDeployment.Name)
}
Expand Down Expand Up @@ -508,7 +508,7 @@ func (r *ReconcileCSI) startDrivers(ver *version.Info, ownerInfo *k8sutil.OwnerI
if err != nil {
return errors.Wrapf(err, "failed to apply network config to cephfs plugin provisioner deployment %q", cephfsProvisionerDeployment.Name)
}
_, err = k8sutil.CreateOrUpdateDeployment(r.context.Clientset, cephfsProvisionerDeployment)
_, err = k8sutil.CreateOrUpdateDeployment(r.opManagerContext, r.context.Clientset, cephfsProvisionerDeployment)
if err != nil {
return errors.Wrapf(err, "failed to start cephfs provisioner deployment %q", cephfsProvisionerDeployment.Name)
}
Expand Down Expand Up @@ -576,7 +576,7 @@ func (r *ReconcileCSI) deleteCSIDriverResources(ver *version.Info, daemonset, de
succeeded = false
}

err = k8sutil.DeleteDeployment(r.context.Clientset, r.opConfig.OperatorNamespace, deployment)
err = k8sutil.DeleteDeployment(r.opManagerContext, r.context.Clientset, r.opConfig.OperatorNamespace, deployment)
if err != nil {
logger.Errorf("failed to delete the %q. %v", deployment, err)
succeeded = false
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/file/mds/mds.go
Expand Up @@ -298,7 +298,7 @@ func (c *Cluster) upgradeMDS() error {

func (c *Cluster) scaleDownDeployments(replicas int32, activeCount int32, desiredDeployments map[string]bool, delete bool) error {
// Remove extraneous mds deployments if they exist
deps, err := getMdsDeployments(c.context, c.fs.Namespace, c.fs.Name)
deps, err := getMdsDeployments(c.clusterInfo.Context, c.context, c.fs.Namespace, c.fs.Name)
if err != nil {
return errors.Wrapf(err,
fmt.Sprintf("cannot verify the removal of extraneous mds deployments for filesystem %s. ", c.fs.Name)+
Expand Down Expand Up @@ -331,13 +331,13 @@ func (c *Cluster) scaleDownDeployments(replicas int32, activeCount int32, desire
localdeployment := d
if !delete {
// stop mds daemon only by scaling deployment replicas to 0
if err := scaleMdsDeployment(c.context, c.fs.Namespace, &localdeployment, 0); err != nil {
if err := scaleMdsDeployment(c.clusterInfo.Context, c.context, c.fs.Namespace, &localdeployment, 0); err != nil {
errCount++
logger.Errorf("failed to scale mds deployment %q. %v", localdeployment.GetName(), err)
}
continue
}
if err := deleteMdsDeployment(c.context, c.fs.Namespace, &localdeployment); err != nil {
if err := deleteMdsDeployment(c.clusterInfo.Context, c.context, c.fs.Namespace, &localdeployment); err != nil {
errCount++
logger.Errorf("failed to delete mds deployment. %v", err)
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/operator/ceph/file/mds/spec.go
Expand Up @@ -160,17 +160,16 @@ func (c *Cluster) podLabels(mdsConfig *mdsConfig, includeNewLabels bool) map[str
return labels
}

func getMdsDeployments(context *clusterd.Context, namespace, fsName string) (*apps.DeploymentList, error) {
func getMdsDeployments(ctx context.Context, context *clusterd.Context, namespace, fsName string) (*apps.DeploymentList, error) {
fsLabelSelector := fmt.Sprintf("rook_file_system=%s", fsName)
deps, err := k8sutil.GetDeployments(context.Clientset, namespace, fsLabelSelector)
deps, err := k8sutil.GetDeployments(ctx, context.Clientset, namespace, fsLabelSelector)
if err != nil {
return nil, errors.Wrapf(err, "could not get deployments for filesystem %s (matching label selector %q)", fsName, fsLabelSelector)
}
return deps, nil
}

func deleteMdsDeployment(clusterdContext *clusterd.Context, namespace string, deployment *apps.Deployment) error {
ctx := context.TODO()
func deleteMdsDeployment(ctx context.Context, clusterdContext *clusterd.Context, namespace string, deployment *apps.Deployment) error {
// Delete the mds deployment
logger.Infof("deleting mds deployment %s", deployment.Name)
var gracePeriod int64
Expand All @@ -182,8 +181,7 @@ func deleteMdsDeployment(clusterdContext *clusterd.Context, namespace string, de
return nil
}

func scaleMdsDeployment(clusterdContext *clusterd.Context, namespace string, deployment *apps.Deployment, replicas int32) error {
ctx := context.TODO()
func scaleMdsDeployment(ctx context.Context, clusterdContext *clusterd.Context, namespace string, deployment *apps.Deployment, replicas int32) error {
// scale mds deployment
logger.Infof("scaling mds deployment %q to %d replicas", deployment.Name, replicas)
d, err := clusterdContext.Clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.GetName(), metav1.GetOptions{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/object/rgw.go
Expand Up @@ -174,7 +174,7 @@ func (c *clusterConfig) startRGWPods(realmName, zoneGroupName, zoneName string)
}

// scale down scenario
deps, err := k8sutil.GetDeployments(c.context.Clientset, c.store.Namespace, c.storeLabelSelector())
deps, err := k8sutil.GetDeployments(c.clusterInfo.Context, c.context.Clientset, c.store.Namespace, c.storeLabelSelector())
if err != nil {
logger.Warningf("could not get deployments for object store %q (matching label selector %q). %v", c.store.Name, c.storeLabelSelector(), err)
}
Expand All @@ -186,7 +186,7 @@ func (c *clusterConfig) startRGWPods(realmName, zoneGroupName, zoneName string)
for i := 0; i < diffCount; {
depIDToRemove := currentRgwInstances - 1
depNameToRemove := fmt.Sprintf("%s-%s-%s", AppName, c.store.Name, k8sutil.IndexToName(depIDToRemove))
if err := k8sutil.DeleteDeployment(c.context.Clientset, c.store.Namespace, depNameToRemove); err != nil {
if err := k8sutil.DeleteDeployment(c.clusterInfo.Context, c.context.Clientset, c.store.Namespace, depNameToRemove); err != nil {
logger.Warningf("error during deletion of deployment %q resource. %v", depNameToRemove, err)
}
currentRgwInstances = currentRgwInstances - 1
Expand All @@ -205,7 +205,7 @@ func (c *clusterConfig) startRGWPods(realmName, zoneGroupName, zoneName string)
}
}
// verify scale down was successful
deps, err = k8sutil.GetDeployments(c.context.Clientset, c.store.Namespace, c.storeLabelSelector())
deps, err = k8sutil.GetDeployments(c.clusterInfo.Context, c.context.Clientset, c.store.Namespace, c.storeLabelSelector())
if err != nil {
logger.Warningf("could not get deployments for object store %q (matching label selector %q). %v", c.store.Name, c.storeLabelSelector(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/pool/controller.go
Expand Up @@ -299,7 +299,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile
}

// ReconcilePoolIDMap updates the `rook-ceph-csi-mapping-config` with local and peer cluster pool ID map
err = peermap.ReconcilePoolIDMap(r.context, r.clusterInfo, cephBlockPool)
err = peermap.ReconcilePoolIDMap(r.opManagerContext, r.context, r.clusterInfo, cephBlockPool)
if err != nil {
return reconcileResponse, errors.Wrapf(err, "failed to update pool ID mapping config for the pool %q", cephBlockPool.Name)
}
Expand Down