diff --git a/pkg/daemon/ceph/osd/remove.go b/pkg/daemon/ceph/osd/remove.go index 821a35f713c8..81ec4863ddb8 100644 --- a/pkg/daemon/ceph/osd/remove.go +++ b/pkg/daemon/ceph/osd/remove.go @@ -102,7 +102,7 @@ func removeOSD(clusterdContext *clusterd.Context, clusterInfo *client.ClusterInf // Remove osd prepare job for _, prepareJob := range prepareJobList.Items { logger.Infof("removing the osd prepare job %q", prepareJob.GetName()) - if err := k8sutil.DeleteBatchJob(clusterdContext.Clientset, clusterInfo.Namespace, prepareJob.GetName(), false); err != nil { + if err := k8sutil.DeleteBatchJob(clusterInfo.Context, clusterdContext.Clientset, clusterInfo.Namespace, prepareJob.GetName(), false); err != nil { if err != nil { // Continue with the cleanup even if the job fails to be deleted logger.Errorf("failed to delete prepare job for osd %q. %v", prepareJob.GetName(), err) diff --git a/pkg/operator/ceph/cluster/cleanup.go b/pkg/operator/ceph/cluster/cleanup.go index c86ccdee649e..4234d2aeb9db 100644 --- a/pkg/operator/ceph/cluster/cleanup.go +++ b/pkg/operator/ceph/cluster/cleanup.go @@ -92,7 +92,7 @@ func (c *ClusterController) startCleanUpJobs(cluster *cephv1.CephCluster, cephHo cephv1.GetCleanupAnnotations(cluster.Spec.Annotations).ApplyToObjectMeta(&job.ObjectMeta) cephv1.GetCleanupLabels(cluster.Spec.Labels).ApplyToObjectMeta(&job.ObjectMeta) - if err := k8sutil.RunReplaceableJob(c.context.Clientset, job, true); err != nil { + if err := k8sutil.RunReplaceableJob(c.OpManagerCtx, c.context.Clientset, job, true); err != nil { logger.Errorf("failed to run cluster clean up job on node %q. %v", hostName, err) } } diff --git a/pkg/operator/ceph/cluster/osd/create.go b/pkg/operator/ceph/cluster/osd/create.go index 173cecab0699..9b534cbfb01d 100644 --- a/pkg/operator/ceph/cluster/osd/create.go +++ b/pkg/operator/ceph/cluster/osd/create.go @@ -366,7 +366,7 @@ func (c *Cluster) runPrepareJob(osdProps *osdProperties, config *provisionConfig return errors.Wrapf(err, "failed to generate osd provisioning job template for %s %q", nodeOrPVC, nodeOrPVCName) } - if err := k8sutil.RunReplaceableJob(c.context.Clientset, job, false); err != nil { + if err := k8sutil.RunReplaceableJob(c.clusterInfo.Context, c.context.Clientset, job, false); err != nil { return errors.Wrapf(err, "failed to run osd provisioning job for %s %q", nodeOrPVC, nodeOrPVCName) } diff --git a/pkg/operator/ceph/cluster/rbd/controller.go b/pkg/operator/ceph/cluster/rbd/controller.go index 2041bb0975f2..601890eab78f 100644 --- a/pkg/operator/ceph/cluster/rbd/controller.go +++ b/pkg/operator/ceph/cluster/rbd/controller.go @@ -188,6 +188,7 @@ func (r *ReconcileCephRBDMirror) reconcile(request reconcile.Request) (reconcile // Detect desired CephCluster version runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion( + r.opManagerContext, r.opConfig.Image, cephRBDMirror.Namespace, controllerName, diff --git a/pkg/operator/ceph/cluster/rbd/controller_test.go b/pkg/operator/ceph/cluster/rbd/controller_test.go index 24c641ab3b31..695c016a46de 100644 --- a/pkg/operator/ceph/cluster/rbd/controller_test.go +++ b/pkg/operator/ceph/cluster/rbd/controller_test.go @@ -62,7 +62,7 @@ func TestCephRBDMirrorController(t *testing.T) { capnslog.SetGlobalLogLevel(capnslog.DEBUG) os.Setenv("ROOK_LOG_LEVEL", "DEBUG") - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { + currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { return &version.Pacific, &version.Pacific, nil } diff --git a/pkg/operator/ceph/cluster/version.go b/pkg/operator/ceph/cluster/version.go index 9a58b24b14de..a3308ed59dee 100644 --- a/pkg/operator/ceph/cluster/version.go +++ b/pkg/operator/ceph/cluster/version.go @@ -28,6 +28,7 @@ import ( func (c *ClusterController) detectAndValidateCephVersion(cluster *cluster) (*cephver.CephVersion, bool, error) { version, err := controller.DetectCephVersion( + c.OpManagerCtx, c.rookImage, cluster.Namespace, detectVersionName, diff --git a/pkg/operator/ceph/controller/version.go b/pkg/operator/ceph/controller/version.go index e29c92c87c01..4a5b0b6e62b7 100644 --- a/pkg/operator/ceph/controller/version.go +++ b/pkg/operator/ceph/controller/version.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "context" "fmt" "time" @@ -61,7 +62,7 @@ func GetImageVersion(cephCluster cephv1.CephCluster) (*cephver.CephVersion, erro // DetectCephVersion loads the ceph version from the image and checks that it meets the version requirements to // run in the cluster -func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*version.CephVersion, error) { +func DetectCephVersion(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*version.CephVersion, error) { cephImage := cephClusterSpec.CephVersion.Image logger.Infof("detecting the ceph image version for image %s...", cephImage) versionReporter, err := cmdreporter.New( @@ -86,7 +87,7 @@ func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil. cephv1.GetMonPlacement(cephClusterSpec.Placement).ApplyToPodSpec(&job.Spec.Template.Spec) job.Spec.Template.Spec.Affinity.PodAntiAffinity = nil - stdout, stderr, retcode, err := versionReporter.Run(detectCephVersionTimeout) + stdout, stderr, retcode, err := versionReporter.Run(ctx, detectCephVersionTimeout) if err != nil { return nil, errors.Wrap(err, "failed to complete ceph version job") } @@ -108,9 +109,9 @@ func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil. return version, nil } -func CurrentAndDesiredCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *cephclient.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { +func CurrentAndDesiredCephVersion(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *cephclient.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { // Detect desired CephCluster version - desiredCephVersion, err := DetectCephVersion(rookImage, namespace, fmt.Sprintf("%s-detect-version", jobName), ownerInfo, context.Clientset, cephClusterSpec) + desiredCephVersion, err := DetectCephVersion(ctx, rookImage, namespace, fmt.Sprintf("%s-detect-version", jobName), ownerInfo, context.Clientset, cephClusterSpec) if err != nil { return nil, nil, errors.Wrap(err, "failed to detect ceph image version") } diff --git a/pkg/operator/ceph/csi/spec.go b/pkg/operator/ceph/csi/spec.go index 581dfbcc3b7f..b9b64aaa1766 100644 --- a/pkg/operator/ceph/csi/spec.go +++ b/pkg/operator/ceph/csi/spec.go @@ -645,7 +645,7 @@ func (r *ReconcileCSI) validateCSIVersion(ownerInfo *k8sutil.OwnerInfo) (*CephCS job.Spec.Template.Spec.Affinity = &corev1.Affinity{ NodeAffinity: getNodeAffinity(r.opConfig.Parameters, provisionerNodeAffinityEnv, &corev1.NodeAffinity{}), } - stdout, _, retcode, err := versionReporter.Run(timeout) + stdout, _, retcode, err := versionReporter.Run(r.opManagerContext, timeout) if err != nil { return nil, errors.Wrap(err, "failed to complete ceph CSI version job") } diff --git a/pkg/operator/ceph/file/controller.go b/pkg/operator/ceph/file/controller.go index 528320147de8..78344d743032 100644 --- a/pkg/operator/ceph/file/controller.go +++ b/pkg/operator/ceph/file/controller.go @@ -261,6 +261,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil // Detect desired CephCluster version runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion( + r.opManagerContext, r.opConfig.Image, cephFilesystem.Namespace, controllerName, diff --git a/pkg/operator/ceph/file/controller_test.go b/pkg/operator/ceph/file/controller_test.go index bc2a4f42cdd7..4840729dfde1 100644 --- a/pkg/operator/ceph/file/controller_test.go +++ b/pkg/operator/ceph/file/controller_test.go @@ -154,7 +154,7 @@ func TestCephFilesystemController(t *testing.T) { capnslog.SetGlobalLogLevel(capnslog.DEBUG) os.Setenv("ROOK_LOG_LEVEL", "DEBUG") - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { + currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { return &version.Pacific, &version.Pacific, nil } diff --git a/pkg/operator/ceph/file/mirror/controller.go b/pkg/operator/ceph/file/mirror/controller.go index 9f1faaa9b794..001c7c838741 100644 --- a/pkg/operator/ceph/file/mirror/controller.go +++ b/pkg/operator/ceph/file/mirror/controller.go @@ -175,6 +175,7 @@ func (r *ReconcileFilesystemMirror) reconcile(request reconcile.Request) (reconc // Detect desired CephCluster version runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion( + r.opManagerContext, r.opConfig.Image, filesystemMirror.Namespace, controllerName, diff --git a/pkg/operator/ceph/file/mirror/controller_test.go b/pkg/operator/ceph/file/mirror/controller_test.go index 7d38e3a68a22..0cc998e24698 100644 --- a/pkg/operator/ceph/file/mirror/controller_test.go +++ b/pkg/operator/ceph/file/mirror/controller_test.go @@ -58,7 +58,7 @@ func TestCephFilesystemMirrorController(t *testing.T) { // Mock cmd reporter // pacific := &version.CephVersion{Major: 16, Minor: 2, Extra: 1, Build: 0, CommitID: ""} - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { + currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { return &version.Octopus, &version.Octopus, nil } @@ -224,7 +224,7 @@ func TestCephFilesystemMirrorController(t *testing.T) { }, } - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { + currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { return &version.Octopus, &version.Pacific, nil } @@ -247,7 +247,7 @@ func TestCephFilesystemMirrorController(t *testing.T) { }, } - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { + currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { return &version.Pacific, &version.Pacific, nil } diff --git a/pkg/operator/ceph/nfs/controller.go b/pkg/operator/ceph/nfs/controller.go index d17cc9abf658..0107e2ef1e47 100644 --- a/pkg/operator/ceph/nfs/controller.go +++ b/pkg/operator/ceph/nfs/controller.go @@ -225,6 +225,7 @@ func (r *ReconcileCephNFS) reconcile(request reconcile.Request) (reconcile.Resul // Detect desired CephCluster version runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion( + r.opManagerContext, r.opConfig.Image, cephNFS.Namespace, controllerName, diff --git a/pkg/operator/ceph/nfs/controller_test.go b/pkg/operator/ceph/nfs/controller_test.go index 9dd6e5dd240e..33c815157236 100644 --- a/pkg/operator/ceph/nfs/controller_test.go +++ b/pkg/operator/ceph/nfs/controller_test.go @@ -148,7 +148,7 @@ func TestCephNFSController(t *testing.T) { }, } - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { + currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) { return &version.Octopus, &version.Octopus, nil } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index 737da3eb6866..633f000f1f97 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -293,6 +293,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci // Detect desired CephCluster version runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion( + r.opManagerContext, r.opConfig.Image, cephObjectStore.Namespace, controllerName, diff --git a/pkg/operator/ceph/object/controller_test.go b/pkg/operator/ceph/object/controller_test.go index ffb325b21f67..0a19ada55b0d 100644 --- a/pkg/operator/ceph/object/controller_test.go +++ b/pkg/operator/ceph/object/controller_test.go @@ -365,7 +365,7 @@ func TestCephObjectStoreController(t *testing.T) { }, } - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) { + currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) { return &cephver.Pacific, &cephver.Pacific, nil } @@ -698,7 +698,7 @@ func TestCephObjectStoreControllerMultisite(t *testing.T) { }, } - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) { + currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) { return &cephver.Pacific, &cephver.Pacific, nil } diff --git a/pkg/operator/k8sutil/cmdreporter/cmdreporter.go b/pkg/operator/k8sutil/cmdreporter/cmdreporter.go index 113151a790da..8e38cf3227f5 100644 --- a/pkg/operator/k8sutil/cmdreporter/cmdreporter.go +++ b/pkg/operator/k8sutil/cmdreporter/cmdreporter.go @@ -144,8 +144,7 @@ func (cr *CmdReporter) Job() *batch.Job { // and retcode of the command as long as the image ran it, even if the retcode is nonzero (failure). // An error is reported only if the command was not run to completion successfully. When this // returns, the ConfigMap is cleaned up (destroyed). -func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcode int, retErr error) { - ctx := context.TODO() +func (cr *CmdReporter) Run(ctx context.Context, timeout time.Duration) (stdout, stderr string, retcode int, retErr error) { jobName := cr.job.Name namespace := cr.job.Namespace errMsg := fmt.Sprintf("failed to run CmdReporter %s successfully", jobName) @@ -161,11 +160,11 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod return "", "", -1, fmt.Errorf("%s. failed to delete existing results ConfigMap %s. %+v", errMsg, jobName, err) } - if err := k8sutil.RunReplaceableJob(cr.clientset, cr.job, true); err != nil { + if err := k8sutil.RunReplaceableJob(ctx, cr.clientset, cr.job, true); err != nil { return "", "", -1, fmt.Errorf("%s. failed to run job. %+v", errMsg, err) } - if err := cr.waitForConfigMap(timeout); err != nil { + if err := cr.waitForConfigMap(ctx, timeout); err != nil { return "", "", -1, fmt.Errorf("%s. failed waiting for results ConfigMap %s. %+v", errMsg, jobName, err) } logger.Debugf("job %s has returned results", jobName) @@ -175,7 +174,7 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod return "", "", -1, fmt.Errorf("%s. results ConfigMap %s should be available, but got an error instead. %+v", errMsg, jobName, err) } - if err := k8sutil.DeleteBatchJob(cr.clientset, namespace, jobName, false); err != nil { + if err := k8sutil.DeleteBatchJob(ctx, cr.clientset, namespace, jobName, false); err != nil { logger.Errorf("continuing after failing delete job %s; user may need to delete it manually. %+v", jobName, err) } @@ -206,8 +205,7 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod } // return watcher or nil if configmap exists -func (cr *CmdReporter) newWatcher() (watch.Interface, error) { - ctx := context.TODO() +func (cr *CmdReporter) newWatcher(ctx context.Context) (watch.Interface, error) { jobName := cr.job.Name namespace := cr.job.Namespace @@ -239,10 +237,10 @@ func (cr *CmdReporter) newWatcher() (watch.Interface, error) { } // return nil when configmap exists -func (cr *CmdReporter) waitForConfigMap(timeout time.Duration) error { +func (cr *CmdReporter) waitForConfigMap(ctx context.Context, timeout time.Duration) error { jobName := cr.job.Name - watcher, err := cr.newWatcher() + watcher, err := cr.newWatcher(ctx) if err != nil { return fmt.Errorf("failed to start watcher for the results ConfigMap. %+v", err) } @@ -257,7 +255,8 @@ func (cr *CmdReporter) waitForConfigMap(timeout time.Duration) error { // timeout timer cannot be started inline in the select statement, or the timeout will be // restarted any time k8s hangs up on the watcher and a new watcher is started - timeoutCh := time.After(timeout) + ctxWithTimeout, cancelFunc := context.WithTimeout(ctx, timeout) + defer cancelFunc() for { select { @@ -269,14 +268,14 @@ func (cr *CmdReporter) waitForConfigMap(timeout time.Duration) error { // clears its change history, which it keeps for only a limited time (~5 mins default) logger.Infof("Kubernetes hung up the watcher for CmdReporter %s result ConfigMap %s; starting a replacement watcher", jobName, jobName) watcher.Stop() // must clean up existing watcher before replacing it with a new one - watcher, err = cr.newWatcher() + watcher, err = cr.newWatcher(ctxWithTimeout) if err != nil { return fmt.Errorf("failed to start replacement watcher for the results ConfigMap. %+v", err) } if watcher == nil { return nil } - case <-timeoutCh: + case <-ctxWithTimeout.Done(): return fmt.Errorf("timed out waiting for results ConfigMap") } } diff --git a/pkg/operator/k8sutil/job.go b/pkg/operator/k8sutil/job.go index ad0e50686227..14d75eb293d7 100644 --- a/pkg/operator/k8sutil/job.go +++ b/pkg/operator/k8sutil/job.go @@ -33,8 +33,7 @@ import ( // another call to this function with the same job name. For example, if a storage operator is // restarted/updated before the job can complete, the operator's next run of the job should replace // the previous job if deleteIfFound is set to true. -func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfFound bool) error { - ctx := context.TODO() +func RunReplaceableJob(ctx context.Context, clientset kubernetes.Interface, job *batch.Job, deleteIfFound bool) error { // check if the job was already created and what its status is existingJob, err := clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { @@ -49,7 +48,7 @@ func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfF // delete the job that already exists from a previous run logger.Infof("Removing previous job %s to start a new one", job.Name) - err := DeleteBatchJob(clientset, job.Namespace, existingJob.Name, true) + err := DeleteBatchJob(ctx, clientset, job.Namespace, existingJob.Name, true) if err != nil { return fmt.Errorf("failed to remove job %s. %+v", job.Name, err) } @@ -61,8 +60,7 @@ func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfF // WaitForJobCompletion waits for a job to reach the completed state. // Assumes that only one pod needs to complete. -func WaitForJobCompletion(clientset kubernetes.Interface, job *batch.Job, timeout time.Duration) error { - ctx := context.TODO() +func WaitForJobCompletion(ctx context.Context, clientset kubernetes.Interface, job *batch.Job, timeout time.Duration) error { logger.Infof("waiting for job %s to complete...", job.Name) return wait.Poll(5*time.Second, timeout, func() (bool, error) { job, err := clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) @@ -87,11 +85,10 @@ func WaitForJobCompletion(clientset kubernetes.Interface, job *batch.Job, timeou } // DeleteBatchJob deletes a Kubernetes job. -func DeleteBatchJob(clientset kubernetes.Interface, namespace, name string, wait bool) error { +func DeleteBatchJob(ctx context.Context, clientset kubernetes.Interface, namespace, name string, wait bool) error { propagation := metav1.DeletePropagationForeground gracePeriod := int64(0) options := &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod, PropagationPolicy: &propagation} - ctx := context.TODO() if err := clientset.BatchV1().Jobs(namespace).Delete(ctx, name, *options); err != nil { if errors.IsNotFound(err) { return nil