From 03e976b52c9e005a9908665165215d3c6801d090 Mon Sep 17 00:00:00 2001 From: Yuichiro Ueno Date: Sat, 11 Dec 2021 11:40:28 +0900 Subject: [PATCH] core: add context parameter to k8sutil service This commit adds context parameter to k8sutil service functions. By this, we can handle cancellation during API call of service resource. Signed-off-by: Yuichiro Ueno --- pkg/operator/ceph/cluster/cluster_external.go | 2 +- pkg/operator/ceph/cluster/mgr/dashboard.go | 2 +- pkg/operator/ceph/cluster/mgr/mgr.go | 2 +- pkg/operator/ceph/cluster/mon/service.go | 2 +- pkg/operator/ceph/cr_manager.go | 2 +- pkg/operator/ceph/csi/spec.go | 6 +++--- pkg/operator/ceph/object/spec.go | 2 +- pkg/operator/ceph/webhook.go | 4 ++-- pkg/operator/k8sutil/service.go | 11 ++++------- 9 files changed, 15 insertions(+), 18 deletions(-) diff --git a/pkg/operator/ceph/cluster/cluster_external.go b/pkg/operator/ceph/cluster/cluster_external.go index 95e071fd3057..284d34a9ab55 100644 --- a/pkg/operator/ceph/cluster/cluster_external.go +++ b/pkg/operator/ceph/cluster/cluster_external.go @@ -213,7 +213,7 @@ func (c *ClusterController) configureExternalClusterMonitoring(context *clusterd return err } logger.Info("creating mgr external monitoring service") - _, err = k8sutil.CreateOrUpdateService(context.Clientset, cluster.Namespace, service) + _, err = k8sutil.CreateOrUpdateService(cluster.ClusterInfo.Context, context.Clientset, cluster.Namespace, service) if err != nil && !kerrors.IsAlreadyExists(err) { return errors.Wrap(err, "failed to create or update mgr service") } diff --git a/pkg/operator/ceph/cluster/mgr/dashboard.go b/pkg/operator/ceph/cluster/mgr/dashboard.go index 49b880d30b2a..173519ec49a2 100644 --- a/pkg/operator/ceph/cluster/mgr/dashboard.go +++ b/pkg/operator/ceph/cluster/mgr/dashboard.go @@ -62,7 +62,7 @@ func (c *Cluster) configureDashboardService(activeDaemon string) error { } if c.spec.Dashboard.Enabled { // expose the dashboard service - if _, err := k8sutil.CreateOrUpdateService(c.context.Clientset, c.clusterInfo.Namespace, dashboardService); err != nil { + if _, err := k8sutil.CreateOrUpdateService(c.clusterInfo.Context, c.context.Clientset, c.clusterInfo.Namespace, dashboardService); err != nil { return errors.Wrap(err, "failed to configure dashboard svc") } } else { diff --git a/pkg/operator/ceph/cluster/mgr/mgr.go b/pkg/operator/ceph/cluster/mgr/mgr.go index da52ffad7847..ae39169c3420 100644 --- a/pkg/operator/ceph/cluster/mgr/mgr.go +++ b/pkg/operator/ceph/cluster/mgr/mgr.go @@ -301,7 +301,7 @@ func (c *Cluster) reconcileServices(activeDaemon string) error { if err != nil { return err } - if _, err := k8sutil.CreateOrUpdateService(c.context.Clientset, c.clusterInfo.Namespace, service); err != nil { + if _, err := k8sutil.CreateOrUpdateService(c.clusterInfo.Context, c.context.Clientset, c.clusterInfo.Namespace, service); err != nil { return errors.Wrap(err, "failed to create mgr metrics service") } diff --git a/pkg/operator/ceph/cluster/mon/service.go b/pkg/operator/ceph/cluster/mon/service.go index ea6f3e8a2c4d..73df857c2cd9 100644 --- a/pkg/operator/ceph/cluster/mon/service.go +++ b/pkg/operator/ceph/cluster/mon/service.go @@ -67,7 +67,7 @@ func (c *Cluster) createService(mon *monConfig) (string, error) { } } - s, err := k8sutil.CreateOrUpdateService(c.context.Clientset, c.Namespace, svcDef) + s, err := k8sutil.CreateOrUpdateService(c.ClusterInfo.Context, c.context.Clientset, c.Namespace, svcDef) if err != nil { return "", errors.Wrapf(err, "failed to create service for mon %s", mon.DaemonName) } diff --git a/pkg/operator/ceph/cr_manager.go b/pkg/operator/ceph/cr_manager.go index d182de47a0d4..c2dce22a1ef2 100644 --- a/pkg/operator/ceph/cr_manager.go +++ b/pkg/operator/ceph/cr_manager.go @@ -185,7 +185,7 @@ func (o *Operator) startCRDManager(context context.Context, mgrErrorCh chan erro return } if isPresent { - err := createWebhookService(o.context) + err := createWebhookService(context, o.context) if err != nil { mgrErrorCh <- errors.Wrap(err, "failed to create admission webhook service") return diff --git a/pkg/operator/ceph/csi/spec.go b/pkg/operator/ceph/csi/spec.go index 476b39c558c0..30a20b6e3703 100644 --- a/pkg/operator/ceph/csi/spec.go +++ b/pkg/operator/ceph/csi/spec.go @@ -458,7 +458,7 @@ func (r *ReconcileCSI) startDrivers(ver *version.Info, ownerInfo *k8sutil.OwnerI if err != nil { return errors.Wrapf(err, "failed to set owner reference to rbd service %q", rbdService) } - _, err = k8sutil.CreateOrUpdateService(r.context.Clientset, r.opConfig.OperatorNamespace, rbdService) + _, err = k8sutil.CreateOrUpdateService(r.opManagerContext, r.context.Clientset, r.opConfig.OperatorNamespace, rbdService) if err != nil { return errors.Wrapf(err, "failed to create rbd service %q", rbdService.Name) } @@ -525,7 +525,7 @@ func (r *ReconcileCSI) startDrivers(ver *version.Info, ownerInfo *k8sutil.OwnerI if err != nil { return errors.Wrapf(err, "failed to set owner reference to cephfs service %q", cephfsService) } - _, err = k8sutil.CreateOrUpdateService(r.context.Clientset, r.opConfig.OperatorNamespace, cephfsService) + _, err = k8sutil.CreateOrUpdateService(r.opManagerContext, r.context.Clientset, r.opConfig.OperatorNamespace, cephfsService) if err != nil { return errors.Wrapf(err, "failed to create cephfs service %q", cephfsService.Name) } @@ -587,7 +587,7 @@ func (r *ReconcileCSI) deleteCSIDriverResources(ver *version.Info, daemonset, de succeeded = false } - err = k8sutil.DeleteService(r.context.Clientset, r.opConfig.OperatorNamespace, service) + err = k8sutil.DeleteService(r.opManagerContext, r.context.Clientset, r.opConfig.OperatorNamespace, service) if err != nil { logger.Errorf("failed to delete the %q. %v", service, err) succeeded = false diff --git a/pkg/operator/ceph/object/spec.go b/pkg/operator/ceph/object/spec.go index 2152b3275bd4..96cb2e2ad3c9 100644 --- a/pkg/operator/ceph/object/spec.go +++ b/pkg/operator/ceph/object/spec.go @@ -512,7 +512,7 @@ func (c *clusterConfig) reconcileService(cephObjectStore *cephv1.CephObjectStore return "", errors.Wrapf(err, "failed to set owner reference to ceph object store service %q", service.Name) } - svc, err := k8sutil.CreateOrUpdateService(c.context.Clientset, cephObjectStore.Namespace, service) + svc, err := k8sutil.CreateOrUpdateService(c.clusterInfo.Context, c.context.Clientset, cephObjectStore.Namespace, service) if err != nil { return "", errors.Wrapf(err, "failed to create or update object store %q service", cephObjectStore.Name) } diff --git a/pkg/operator/ceph/webhook.go b/pkg/operator/ceph/webhook.go index ed67c1cc38c1..ba15c717ba7a 100644 --- a/pkg/operator/ceph/webhook.go +++ b/pkg/operator/ceph/webhook.go @@ -72,7 +72,7 @@ func isSecretPresent(ctx context.Context, context *clusterd.Context) (bool, erro return true, nil } -func createWebhookService(context *clusterd.Context) error { +func createWebhookService(ctx context.Context, context *clusterd.Context) error { webhookService := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: admissionControllerAppName, @@ -93,7 +93,7 @@ func createWebhookService(context *clusterd.Context) error { }, } - _, err := k8sutil.CreateOrUpdateService(context.Clientset, namespace, &webhookService) + _, err := k8sutil.CreateOrUpdateService(ctx, context.Clientset, namespace, &webhookService) if err != nil { return err } diff --git a/pkg/operator/k8sutil/service.go b/pkg/operator/k8sutil/service.go index 51b73f2b6bf2..cd3e8bc6d5fb 100644 --- a/pkg/operator/k8sutil/service.go +++ b/pkg/operator/k8sutil/service.go @@ -28,9 +28,8 @@ import ( // CreateOrUpdateService creates a service or updates the service declaratively if it already exists. func CreateOrUpdateService( - clientset kubernetes.Interface, namespace string, serviceDefinition *v1.Service, + ctx context.Context, clientset kubernetes.Interface, namespace string, serviceDefinition *v1.Service, ) (*v1.Service, error) { - ctx := context.TODO() name := serviceDefinition.Name logger.Debugf("creating service %s", name) @@ -39,7 +38,7 @@ func CreateOrUpdateService( if !errors.IsAlreadyExists(err) { return nil, fmt.Errorf("failed to create service %s. %+v", name, err) } - s, err = UpdateService(clientset, namespace, serviceDefinition) + s, err = UpdateService(ctx, clientset, namespace, serviceDefinition) if err != nil { return nil, fmt.Errorf("failed to update service %s. %+v", name, err) } @@ -52,9 +51,8 @@ func CreateOrUpdateService( // UpdateService updates a service declaratively. If the service does not exist this is considered // an error condition. func UpdateService( - clientset kubernetes.Interface, namespace string, serviceDefinition *v1.Service, + ctx context.Context, clientset kubernetes.Interface, namespace string, serviceDefinition *v1.Service, ) (*v1.Service, error) { - ctx := context.TODO() name := serviceDefinition.Name logger.Debugf("updating service %s", name) existing, err := clientset.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) @@ -69,8 +67,7 @@ func UpdateService( } // DeleteService deletes a Service and returns the error if any -func DeleteService(clientset kubernetes.Interface, namespace, name string) error { - ctx := context.TODO() +func DeleteService(ctx context.Context, clientset kubernetes.Interface, namespace, name string) error { err := clientset.CoreV1().Services(namespace).Delete(ctx, name, metav1.DeleteOptions{}) if err != nil { if errors.IsNotFound(err) {