Skip to content

Commit

Permalink
core: add context parameter to k8sutil job
Browse files Browse the repository at this point in the history
This commit adds context parameter to k8sutil job functions. By this, we
can handle cancellation during API call of job resource.

Signed-off-by: Yuichiro Ueno <y1r.ueno@gmail.com>
  • Loading branch information
y1r committed Nov 15, 2021
1 parent 7d87de8 commit 9bcda2c
Show file tree
Hide file tree
Showing 18 changed files with 35 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pkg/daemon/ceph/osd/remove.go
Expand Up @@ -102,7 +102,7 @@ func removeOSD(clusterdContext *clusterd.Context, clusterInfo *client.ClusterInf
// Remove osd prepare job
for _, prepareJob := range prepareJobList.Items {
logger.Infof("removing the osd prepare job %q", prepareJob.GetName())
if err := k8sutil.DeleteBatchJob(clusterdContext.Clientset, clusterInfo.Namespace, prepareJob.GetName(), false); err != nil {
if err := k8sutil.DeleteBatchJob(clusterInfo.Context, clusterdContext.Clientset, clusterInfo.Namespace, prepareJob.GetName(), false); err != nil {
if err != nil {
// Continue with the cleanup even if the job fails to be deleted
logger.Errorf("failed to delete prepare job for osd %q. %v", prepareJob.GetName(), err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/cleanup.go
Expand Up @@ -92,7 +92,7 @@ func (c *ClusterController) startCleanUpJobs(cluster *cephv1.CephCluster, cephHo
cephv1.GetCleanupAnnotations(cluster.Spec.Annotations).ApplyToObjectMeta(&job.ObjectMeta)
cephv1.GetCleanupLabels(cluster.Spec.Labels).ApplyToObjectMeta(&job.ObjectMeta)

if err := k8sutil.RunReplaceableJob(c.context.Clientset, job, true); err != nil {
if err := k8sutil.RunReplaceableJob(c.OpManagerCtx, c.context.Clientset, job, true); err != nil {
logger.Errorf("failed to run cluster clean up job on node %q. %v", hostName, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/create.go
Expand Up @@ -366,7 +366,7 @@ func (c *Cluster) runPrepareJob(osdProps *osdProperties, config *provisionConfig
return errors.Wrapf(err, "failed to generate osd provisioning job template for %s %q", nodeOrPVC, nodeOrPVCName)
}

if err := k8sutil.RunReplaceableJob(c.context.Clientset, job, false); err != nil {
if err := k8sutil.RunReplaceableJob(c.clusterInfo.Context, c.context.Clientset, job, false); err != nil {
return errors.Wrapf(err, "failed to run osd provisioning job for %s %q", nodeOrPVC, nodeOrPVCName)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/cluster/rbd/controller.go
Expand Up @@ -188,6 +188,7 @@ func (r *ReconcileCephRBDMirror) reconcile(request reconcile.Request) (reconcile

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
cephRBDMirror.Namespace,
controllerName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/controller_test.go
Expand Up @@ -62,7 +62,7 @@ func TestCephRBDMirrorController(t *testing.T) {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
os.Setenv("ROOK_LOG_LEVEL", "DEBUG")

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Pacific, &version.Pacific, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/cluster/version.go
Expand Up @@ -28,6 +28,7 @@ import (

func (c *ClusterController) detectAndValidateCephVersion(cluster *cluster) (*cephver.CephVersion, bool, error) {
version, err := controller.DetectCephVersion(
c.OpManagerCtx,
c.rookImage,
cluster.Namespace,
detectVersionName,
Expand Down
9 changes: 5 additions & 4 deletions pkg/operator/ceph/controller/version.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -61,7 +62,7 @@ func GetImageVersion(cephCluster cephv1.CephCluster) (*cephver.CephVersion, erro

// DetectCephVersion loads the ceph version from the image and checks that it meets the version requirements to
// run in the cluster
func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*version.CephVersion, error) {
func DetectCephVersion(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*version.CephVersion, error) {
cephImage := cephClusterSpec.CephVersion.Image
logger.Infof("detecting the ceph image version for image %s...", cephImage)
versionReporter, err := cmdreporter.New(
Expand All @@ -86,7 +87,7 @@ func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.
cephv1.GetMonPlacement(cephClusterSpec.Placement).ApplyToPodSpec(&job.Spec.Template.Spec)
job.Spec.Template.Spec.Affinity.PodAntiAffinity = nil

stdout, stderr, retcode, err := versionReporter.Run(detectCephVersionTimeout)
stdout, stderr, retcode, err := versionReporter.Run(ctx, detectCephVersionTimeout)
if err != nil {
return nil, errors.Wrap(err, "failed to complete ceph version job")
}
Expand All @@ -108,9 +109,9 @@ func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.
return version, nil
}

func CurrentAndDesiredCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *cephclient.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
func CurrentAndDesiredCephVersion(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *cephclient.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
// Detect desired CephCluster version
desiredCephVersion, err := DetectCephVersion(rookImage, namespace, fmt.Sprintf("%s-detect-version", jobName), ownerInfo, context.Clientset, cephClusterSpec)
desiredCephVersion, err := DetectCephVersion(ctx, rookImage, namespace, fmt.Sprintf("%s-detect-version", jobName), ownerInfo, context.Clientset, cephClusterSpec)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to detect ceph image version")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/csi/spec.go
Expand Up @@ -645,7 +645,7 @@ func (r *ReconcileCSI) validateCSIVersion(ownerInfo *k8sutil.OwnerInfo) (*CephCS
job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: getNodeAffinity(r.opConfig.Parameters, provisionerNodeAffinityEnv, &corev1.NodeAffinity{}),
}
stdout, _, retcode, err := versionReporter.Run(timeout)
stdout, _, retcode, err := versionReporter.Run(r.opManagerContext, timeout)
if err != nil {
return nil, errors.Wrap(err, "failed to complete ceph CSI version job")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/file/controller.go
Expand Up @@ -261,6 +261,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
cephFilesystem.Namespace,
controllerName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/controller_test.go
Expand Up @@ -154,7 +154,7 @@ func TestCephFilesystemController(t *testing.T) {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
os.Setenv("ROOK_LOG_LEVEL", "DEBUG")

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Pacific, &version.Pacific, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/file/mirror/controller.go
Expand Up @@ -175,6 +175,7 @@ func (r *ReconcileFilesystemMirror) reconcile(request reconcile.Request) (reconc

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
filesystemMirror.Namespace,
controllerName,
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/file/mirror/controller_test.go
Expand Up @@ -58,7 +58,7 @@ func TestCephFilesystemMirrorController(t *testing.T) {

// Mock cmd reporter
// pacific := &version.CephVersion{Major: 16, Minor: 2, Extra: 1, Build: 0, CommitID: ""}
currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Octopus, &version.Octopus, nil
}

Expand Down Expand Up @@ -224,7 +224,7 @@ func TestCephFilesystemMirrorController(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Octopus, &version.Pacific, nil
}

Expand All @@ -247,7 +247,7 @@ func TestCephFilesystemMirrorController(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Pacific, &version.Pacific, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/nfs/controller.go
Expand Up @@ -225,6 +225,7 @@ func (r *ReconcileCephNFS) reconcile(request reconcile.Request) (reconcile.Resul

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
cephNFS.Namespace,
controllerName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/nfs/controller_test.go
Expand Up @@ -148,7 +148,7 @@ func TestCephNFSController(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Octopus, &version.Octopus, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -293,6 +293,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
cephObjectStore.Namespace,
controllerName,
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/object/controller_test.go
Expand Up @@ -365,7 +365,7 @@ func TestCephObjectStoreController(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
return &cephver.Pacific, &cephver.Pacific, nil
}

Expand Down Expand Up @@ -698,7 +698,7 @@ func TestCephObjectStoreControllerMultisite(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
return &cephver.Pacific, &cephver.Pacific, nil
}

Expand Down
18 changes: 8 additions & 10 deletions pkg/operator/k8sutil/cmdreporter/cmdreporter.go
Expand Up @@ -144,8 +144,7 @@ func (cr *CmdReporter) Job() *batch.Job {
// and retcode of the command as long as the image ran it, even if the retcode is nonzero (failure).
// An error is reported only if the command was not run to completion successfully. When this
// returns, the ConfigMap is cleaned up (destroyed).
func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcode int, retErr error) {
ctx := context.TODO()
func (cr *CmdReporter) Run(ctx context.Context, timeout time.Duration) (stdout, stderr string, retcode int, retErr error) {
jobName := cr.job.Name
namespace := cr.job.Namespace
errMsg := fmt.Sprintf("failed to run CmdReporter %s successfully", jobName)
Expand All @@ -161,11 +160,11 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod
return "", "", -1, fmt.Errorf("%s. failed to delete existing results ConfigMap %s. %+v", errMsg, jobName, err)
}

if err := k8sutil.RunReplaceableJob(cr.clientset, cr.job, true); err != nil {
if err := k8sutil.RunReplaceableJob(ctx, cr.clientset, cr.job, true); err != nil {
return "", "", -1, fmt.Errorf("%s. failed to run job. %+v", errMsg, err)
}

if err := cr.waitForConfigMap(timeout); err != nil {
if err := cr.waitForConfigMap(ctx, timeout); err != nil {
return "", "", -1, fmt.Errorf("%s. failed waiting for results ConfigMap %s. %+v", errMsg, jobName, err)
}
logger.Debugf("job %s has returned results", jobName)
Expand All @@ -175,7 +174,7 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod
return "", "", -1, fmt.Errorf("%s. results ConfigMap %s should be available, but got an error instead. %+v", errMsg, jobName, err)
}

if err := k8sutil.DeleteBatchJob(cr.clientset, namespace, jobName, false); err != nil {
if err := k8sutil.DeleteBatchJob(ctx, cr.clientset, namespace, jobName, false); err != nil {
logger.Errorf("continuing after failing delete job %s; user may need to delete it manually. %+v", jobName, err)
}

Expand Down Expand Up @@ -206,8 +205,7 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod
}

// return watcher or nil if configmap exists
func (cr *CmdReporter) newWatcher() (watch.Interface, error) {
ctx := context.TODO()
func (cr *CmdReporter) newWatcher(ctx context.Context) (watch.Interface, error) {
jobName := cr.job.Name
namespace := cr.job.Namespace

Expand Down Expand Up @@ -239,10 +237,10 @@ func (cr *CmdReporter) newWatcher() (watch.Interface, error) {
}

// return nil when configmap exists
func (cr *CmdReporter) waitForConfigMap(timeout time.Duration) error {
func (cr *CmdReporter) waitForConfigMap(ctx context.Context, timeout time.Duration) error {
jobName := cr.job.Name

watcher, err := cr.newWatcher()
watcher, err := cr.newWatcher(ctx)
if err != nil {
return fmt.Errorf("failed to start watcher for the results ConfigMap. %+v", err)
}
Expand All @@ -269,7 +267,7 @@ func (cr *CmdReporter) waitForConfigMap(timeout time.Duration) error {
// clears its change history, which it keeps for only a limited time (~5 mins default)
logger.Infof("Kubernetes hung up the watcher for CmdReporter %s result ConfigMap %s; starting a replacement watcher", jobName, jobName)
watcher.Stop() // must clean up existing watcher before replacing it with a new one
watcher, err = cr.newWatcher()
watcher, err = cr.newWatcher(ctx)
if err != nil {
return fmt.Errorf("failed to start replacement watcher for the results ConfigMap. %+v", err)
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/operator/k8sutil/job.go
Expand Up @@ -33,8 +33,7 @@ import (
// another call to this function with the same job name. For example, if a storage operator is
// restarted/updated before the job can complete, the operator's next run of the job should replace
// the previous job if deleteIfFound is set to true.
func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfFound bool) error {
ctx := context.TODO()
func RunReplaceableJob(ctx context.Context, clientset kubernetes.Interface, job *batch.Job, deleteIfFound bool) error {
// check if the job was already created and what its status is
existingJob, err := clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -49,7 +48,7 @@ func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfF

// delete the job that already exists from a previous run
logger.Infof("Removing previous job %s to start a new one", job.Name)
err := DeleteBatchJob(clientset, job.Namespace, existingJob.Name, true)
err := DeleteBatchJob(ctx, clientset, job.Namespace, existingJob.Name, true)
if err != nil {
return fmt.Errorf("failed to remove job %s. %+v", job.Name, err)
}
Expand All @@ -61,8 +60,7 @@ func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfF

// WaitForJobCompletion waits for a job to reach the completed state.
// Assumes that only one pod needs to complete.
func WaitForJobCompletion(clientset kubernetes.Interface, job *batch.Job, timeout time.Duration) error {
ctx := context.TODO()
func WaitForJobCompletion(ctx context.Context, clientset kubernetes.Interface, job *batch.Job, timeout time.Duration) error {
logger.Infof("waiting for job %s to complete...", job.Name)
return wait.Poll(5*time.Second, timeout, func() (bool, error) {
job, err := clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
Expand All @@ -87,11 +85,10 @@ func WaitForJobCompletion(clientset kubernetes.Interface, job *batch.Job, timeou
}

// DeleteBatchJob deletes a Kubernetes job.
func DeleteBatchJob(clientset kubernetes.Interface, namespace, name string, wait bool) error {
func DeleteBatchJob(ctx context.Context, clientset kubernetes.Interface, namespace, name string, wait bool) error {
propagation := metav1.DeletePropagationForeground
gracePeriod := int64(0)
options := &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod, PropagationPolicy: &propagation}
ctx := context.TODO()
if err := clientset.BatchV1().Jobs(namespace).Delete(ctx, name, *options); err != nil {
if errors.IsNotFound(err) {
return nil
Expand Down

0 comments on commit 9bcda2c

Please sign in to comment.