Skip to content

Commit

Permalink
Merge pull request #9162 from y1r/add-context-k8sutil-job
Browse files Browse the repository at this point in the history
core: add context parameter to k8sutil job
  • Loading branch information
leseb committed Nov 15, 2021
2 parents 07e1ca6 + 3799542 commit f44b943
Show file tree
Hide file tree
Showing 18 changed files with 38 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pkg/daemon/ceph/osd/remove.go
Expand Up @@ -102,7 +102,7 @@ func removeOSD(clusterdContext *clusterd.Context, clusterInfo *client.ClusterInf
// Remove osd prepare job
for _, prepareJob := range prepareJobList.Items {
logger.Infof("removing the osd prepare job %q", prepareJob.GetName())
if err := k8sutil.DeleteBatchJob(clusterdContext.Clientset, clusterInfo.Namespace, prepareJob.GetName(), false); err != nil {
if err := k8sutil.DeleteBatchJob(clusterInfo.Context, clusterdContext.Clientset, clusterInfo.Namespace, prepareJob.GetName(), false); err != nil {
if err != nil {
// Continue with the cleanup even if the job fails to be deleted
logger.Errorf("failed to delete prepare job for osd %q. %v", prepareJob.GetName(), err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/cleanup.go
Expand Up @@ -92,7 +92,7 @@ func (c *ClusterController) startCleanUpJobs(cluster *cephv1.CephCluster, cephHo
cephv1.GetCleanupAnnotations(cluster.Spec.Annotations).ApplyToObjectMeta(&job.ObjectMeta)
cephv1.GetCleanupLabels(cluster.Spec.Labels).ApplyToObjectMeta(&job.ObjectMeta)

if err := k8sutil.RunReplaceableJob(c.context.Clientset, job, true); err != nil {
if err := k8sutil.RunReplaceableJob(c.OpManagerCtx, c.context.Clientset, job, true); err != nil {
logger.Errorf("failed to run cluster clean up job on node %q. %v", hostName, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/osd/create.go
Expand Up @@ -366,7 +366,7 @@ func (c *Cluster) runPrepareJob(osdProps *osdProperties, config *provisionConfig
return errors.Wrapf(err, "failed to generate osd provisioning job template for %s %q", nodeOrPVC, nodeOrPVCName)
}

if err := k8sutil.RunReplaceableJob(c.context.Clientset, job, false); err != nil {
if err := k8sutil.RunReplaceableJob(c.clusterInfo.Context, c.context.Clientset, job, false); err != nil {
return errors.Wrapf(err, "failed to run osd provisioning job for %s %q", nodeOrPVC, nodeOrPVCName)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/cluster/rbd/controller.go
Expand Up @@ -188,6 +188,7 @@ func (r *ReconcileCephRBDMirror) reconcile(request reconcile.Request) (reconcile

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
cephRBDMirror.Namespace,
controllerName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/rbd/controller_test.go
Expand Up @@ -62,7 +62,7 @@ func TestCephRBDMirrorController(t *testing.T) {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
os.Setenv("ROOK_LOG_LEVEL", "DEBUG")

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Pacific, &version.Pacific, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/cluster/version.go
Expand Up @@ -28,6 +28,7 @@ import (

func (c *ClusterController) detectAndValidateCephVersion(cluster *cluster) (*cephver.CephVersion, bool, error) {
version, err := controller.DetectCephVersion(
c.OpManagerCtx,
c.rookImage,
cluster.Namespace,
detectVersionName,
Expand Down
9 changes: 5 additions & 4 deletions pkg/operator/ceph/controller/version.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -61,7 +62,7 @@ func GetImageVersion(cephCluster cephv1.CephCluster) (*cephver.CephVersion, erro

// DetectCephVersion loads the ceph version from the image and checks that it meets the version requirements to
// run in the cluster
func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*version.CephVersion, error) {
func DetectCephVersion(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*version.CephVersion, error) {
cephImage := cephClusterSpec.CephVersion.Image
logger.Infof("detecting the ceph image version for image %s...", cephImage)
versionReporter, err := cmdreporter.New(
Expand All @@ -86,7 +87,7 @@ func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.
cephv1.GetMonPlacement(cephClusterSpec.Placement).ApplyToPodSpec(&job.Spec.Template.Spec)
job.Spec.Template.Spec.Affinity.PodAntiAffinity = nil

stdout, stderr, retcode, err := versionReporter.Run(detectCephVersionTimeout)
stdout, stderr, retcode, err := versionReporter.Run(ctx, detectCephVersionTimeout)
if err != nil {
return nil, errors.Wrap(err, "failed to complete ceph version job")
}
Expand All @@ -108,9 +109,9 @@ func DetectCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.
return version, nil
}

func CurrentAndDesiredCephVersion(rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *cephclient.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
func CurrentAndDesiredCephVersion(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *cephclient.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
// Detect desired CephCluster version
desiredCephVersion, err := DetectCephVersion(rookImage, namespace, fmt.Sprintf("%s-detect-version", jobName), ownerInfo, context.Clientset, cephClusterSpec)
desiredCephVersion, err := DetectCephVersion(ctx, rookImage, namespace, fmt.Sprintf("%s-detect-version", jobName), ownerInfo, context.Clientset, cephClusterSpec)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to detect ceph image version")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/csi/spec.go
Expand Up @@ -645,7 +645,7 @@ func (r *ReconcileCSI) validateCSIVersion(ownerInfo *k8sutil.OwnerInfo) (*CephCS
job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: getNodeAffinity(r.opConfig.Parameters, provisionerNodeAffinityEnv, &corev1.NodeAffinity{}),
}
stdout, _, retcode, err := versionReporter.Run(timeout)
stdout, _, retcode, err := versionReporter.Run(r.opManagerContext, timeout)
if err != nil {
return nil, errors.Wrap(err, "failed to complete ceph CSI version job")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/file/controller.go
Expand Up @@ -261,6 +261,7 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
cephFilesystem.Namespace,
controllerName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/file/controller_test.go
Expand Up @@ -154,7 +154,7 @@ func TestCephFilesystemController(t *testing.T) {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
os.Setenv("ROOK_LOG_LEVEL", "DEBUG")

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Pacific, &version.Pacific, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/file/mirror/controller.go
Expand Up @@ -175,6 +175,7 @@ func (r *ReconcileFilesystemMirror) reconcile(request reconcile.Request) (reconc

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
filesystemMirror.Namespace,
controllerName,
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/file/mirror/controller_test.go
Expand Up @@ -58,7 +58,7 @@ func TestCephFilesystemMirrorController(t *testing.T) {

// Mock cmd reporter
// pacific := &version.CephVersion{Major: 16, Minor: 2, Extra: 1, Build: 0, CommitID: ""}
currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Octopus, &version.Octopus, nil
}

Expand Down Expand Up @@ -224,7 +224,7 @@ func TestCephFilesystemMirrorController(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Octopus, &version.Pacific, nil
}

Expand All @@ -247,7 +247,7 @@ func TestCephFilesystemMirrorController(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Pacific, &version.Pacific, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/nfs/controller.go
Expand Up @@ -225,6 +225,7 @@ func (r *ReconcileCephNFS) reconcile(request reconcile.Request) (reconcile.Resul

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
cephNFS.Namespace,
controllerName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/nfs/controller_test.go
Expand Up @@ -148,7 +148,7 @@ func TestCephNFSController(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*version.CephVersion, *version.CephVersion, error) {
return &version.Octopus, &version.Octopus, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -293,6 +293,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci

// Detect desired CephCluster version
runningCephVersion, desiredCephVersion, err := currentAndDesiredCephVersion(
r.opManagerContext,
r.opConfig.Image,
cephObjectStore.Namespace,
controllerName,
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/object/controller_test.go
Expand Up @@ -365,7 +365,7 @@ func TestCephObjectStoreController(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
return &cephver.Pacific, &cephver.Pacific, nil
}

Expand Down Expand Up @@ -698,7 +698,7 @@ func TestCephObjectStoreControllerMultisite(t *testing.T) {
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
currentAndDesiredCephVersion = func(ctx context.Context, rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
return &cephver.Pacific, &cephver.Pacific, nil
}

Expand Down
23 changes: 11 additions & 12 deletions pkg/operator/k8sutil/cmdreporter/cmdreporter.go
Expand Up @@ -144,8 +144,7 @@ func (cr *CmdReporter) Job() *batch.Job {
// and retcode of the command as long as the image ran it, even if the retcode is nonzero (failure).
// An error is reported only if the command was not run to completion successfully. When this
// returns, the ConfigMap is cleaned up (destroyed).
func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcode int, retErr error) {
ctx := context.TODO()
func (cr *CmdReporter) Run(ctx context.Context, timeout time.Duration) (stdout, stderr string, retcode int, retErr error) {
jobName := cr.job.Name
namespace := cr.job.Namespace
errMsg := fmt.Sprintf("failed to run CmdReporter %s successfully", jobName)
Expand All @@ -161,11 +160,11 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod
return "", "", -1, fmt.Errorf("%s. failed to delete existing results ConfigMap %s. %+v", errMsg, jobName, err)
}

if err := k8sutil.RunReplaceableJob(cr.clientset, cr.job, true); err != nil {
if err := k8sutil.RunReplaceableJob(ctx, cr.clientset, cr.job, true); err != nil {
return "", "", -1, fmt.Errorf("%s. failed to run job. %+v", errMsg, err)
}

if err := cr.waitForConfigMap(timeout); err != nil {
if err := cr.waitForConfigMap(ctx, timeout); err != nil {
return "", "", -1, fmt.Errorf("%s. failed waiting for results ConfigMap %s. %+v", errMsg, jobName, err)
}
logger.Debugf("job %s has returned results", jobName)
Expand All @@ -175,7 +174,7 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod
return "", "", -1, fmt.Errorf("%s. results ConfigMap %s should be available, but got an error instead. %+v", errMsg, jobName, err)
}

if err := k8sutil.DeleteBatchJob(cr.clientset, namespace, jobName, false); err != nil {
if err := k8sutil.DeleteBatchJob(ctx, cr.clientset, namespace, jobName, false); err != nil {
logger.Errorf("continuing after failing delete job %s; user may need to delete it manually. %+v", jobName, err)
}

Expand Down Expand Up @@ -206,8 +205,7 @@ func (cr *CmdReporter) Run(timeout time.Duration) (stdout, stderr string, retcod
}

// return watcher or nil if configmap exists
func (cr *CmdReporter) newWatcher() (watch.Interface, error) {
ctx := context.TODO()
func (cr *CmdReporter) newWatcher(ctx context.Context) (watch.Interface, error) {
jobName := cr.job.Name
namespace := cr.job.Namespace

Expand Down Expand Up @@ -239,10 +237,10 @@ func (cr *CmdReporter) newWatcher() (watch.Interface, error) {
}

// return nil when configmap exists
func (cr *CmdReporter) waitForConfigMap(timeout time.Duration) error {
func (cr *CmdReporter) waitForConfigMap(ctx context.Context, timeout time.Duration) error {
jobName := cr.job.Name

watcher, err := cr.newWatcher()
watcher, err := cr.newWatcher(ctx)
if err != nil {
return fmt.Errorf("failed to start watcher for the results ConfigMap. %+v", err)
}
Expand All @@ -257,7 +255,8 @@ func (cr *CmdReporter) waitForConfigMap(timeout time.Duration) error {

// timeout timer cannot be started inline in the select statement, or the timeout will be
// restarted any time k8s hangs up on the watcher and a new watcher is started
timeoutCh := time.After(timeout)
ctxWithTimeout, cancelFunc := context.WithTimeout(ctx, timeout)
defer cancelFunc()

for {
select {
Expand All @@ -269,14 +268,14 @@ func (cr *CmdReporter) waitForConfigMap(timeout time.Duration) error {
// clears its change history, which it keeps for only a limited time (~5 mins default)
logger.Infof("Kubernetes hung up the watcher for CmdReporter %s result ConfigMap %s; starting a replacement watcher", jobName, jobName)
watcher.Stop() // must clean up existing watcher before replacing it with a new one
watcher, err = cr.newWatcher()
watcher, err = cr.newWatcher(ctxWithTimeout)
if err != nil {
return fmt.Errorf("failed to start replacement watcher for the results ConfigMap. %+v", err)
}
if watcher == nil {
return nil
}
case <-timeoutCh:
case <-ctxWithTimeout.Done():
return fmt.Errorf("timed out waiting for results ConfigMap")
}
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/operator/k8sutil/job.go
Expand Up @@ -33,8 +33,7 @@ import (
// another call to this function with the same job name. For example, if a storage operator is
// restarted/updated before the job can complete, the operator's next run of the job should replace
// the previous job if deleteIfFound is set to true.
func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfFound bool) error {
ctx := context.TODO()
func RunReplaceableJob(ctx context.Context, clientset kubernetes.Interface, job *batch.Job, deleteIfFound bool) error {
// check if the job was already created and what its status is
existingJob, err := clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -49,7 +48,7 @@ func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfF

// delete the job that already exists from a previous run
logger.Infof("Removing previous job %s to start a new one", job.Name)
err := DeleteBatchJob(clientset, job.Namespace, existingJob.Name, true)
err := DeleteBatchJob(ctx, clientset, job.Namespace, existingJob.Name, true)
if err != nil {
return fmt.Errorf("failed to remove job %s. %+v", job.Name, err)
}
Expand All @@ -61,8 +60,7 @@ func RunReplaceableJob(clientset kubernetes.Interface, job *batch.Job, deleteIfF

// WaitForJobCompletion waits for a job to reach the completed state.
// Assumes that only one pod needs to complete.
func WaitForJobCompletion(clientset kubernetes.Interface, job *batch.Job, timeout time.Duration) error {
ctx := context.TODO()
func WaitForJobCompletion(ctx context.Context, clientset kubernetes.Interface, job *batch.Job, timeout time.Duration) error {
logger.Infof("waiting for job %s to complete...", job.Name)
return wait.Poll(5*time.Second, timeout, func() (bool, error) {
job, err := clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
Expand All @@ -87,11 +85,10 @@ func WaitForJobCompletion(clientset kubernetes.Interface, job *batch.Job, timeou
}

// DeleteBatchJob deletes a Kubernetes job.
func DeleteBatchJob(clientset kubernetes.Interface, namespace, name string, wait bool) error {
func DeleteBatchJob(ctx context.Context, clientset kubernetes.Interface, namespace, name string, wait bool) error {
propagation := metav1.DeletePropagationForeground
gracePeriod := int64(0)
options := &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod, PropagationPolicy: &propagation}
ctx := context.TODO()
if err := clientset.BatchV1().Jobs(namespace).Delete(ctx, name, *options); err != nil {
if errors.IsNotFound(err) {
return nil
Expand Down

0 comments on commit f44b943

Please sign in to comment.