diff --git a/cmd/rook/ceph/cleanup.go b/cmd/rook/ceph/cleanup.go index 27fb80c6e5a4..e13e92a3067e 100644 --- a/cmd/rook/ceph/cleanup.go +++ b/cmd/rook/ceph/cleanup.go @@ -59,6 +59,8 @@ func startCleanUp(cmd *cobra.Command, args []string) error { rook.SetLogLevel() rook.LogStartupInfo(cleanUpCmd.Flags()) + ctx := cmd.Context() + logger.Info("starting cluster clean up") // Delete dataDirHostPath if dataDirHostPath != "" { @@ -67,7 +69,7 @@ func startCleanUp(cmd *cobra.Command, args []string) error { } namespace := os.Getenv(k8sutil.PodNamespaceEnvVar) - clusterInfo := client.AdminClusterInfo(namespace, "") + clusterInfo := client.AdminClusterInfo(ctx, namespace, "") clusterInfo.FSID = clusterFSID // Build Sanitizer diff --git a/cmd/rook/discover.go b/cmd/rook/discover.go index 468515a46b18..d33eeb658a97 100644 --- a/cmd/rook/discover.go +++ b/cmd/rook/discover.go @@ -51,8 +51,9 @@ func startDiscover(cmd *cobra.Command, args []string) error { rook.LogStartupInfo(discoverCmd.Flags()) context := rook.NewContext() + ctx := cmd.Context() - err := discover.Run(context, discoverDevicesInterval, usesCVInventory) + err := discover.Run(ctx, context, discoverDevicesInterval, usesCVInventory) if err != nil { rook.TerminateFatal(err) } diff --git a/pkg/daemon/ceph/client/command.go b/pkg/daemon/ceph/client/command.go index d94a8d0d09ed..0efff17945ce 100644 --- a/pkg/daemon/ceph/client/command.go +++ b/pkg/daemon/ceph/client/command.go @@ -165,7 +165,7 @@ func (c *CephToolCommand) run() ([]byte, error) { // Still forcing the check for the command if the behavior changes in the future if command == RBDTool { if c.RemoteExecution { - output, stderr, err = c.context.RemoteExecutor.ExecCommandInContainerWithFullOutputWithTimeout(ProxyAppLabel, CommandProxyInitContainerName, c.clusterInfo.Namespace, append([]string{command}, args...)...) + output, stderr, err = c.context.RemoteExecutor.ExecCommandInContainerWithFullOutputWithTimeout(c.clusterInfo.Context, ProxyAppLabel, CommandProxyInitContainerName, c.clusterInfo.Namespace, append([]string{command}, args...)...) if stderr != "" || err != nil { err = errors.Errorf("%s. %s", err.Error(), stderr) } diff --git a/pkg/daemon/ceph/client/info.go b/pkg/daemon/ceph/client/info.go index 3bfb1ea924d8..b4a736675398 100644 --- a/pkg/daemon/ceph/client/info.go +++ b/pkg/daemon/ceph/client/info.go @@ -89,7 +89,7 @@ func (c *ClusterInfo) NamespacedName() types.NamespacedName { // AdminClusterInfo() creates a ClusterInfo with the basic info to access the cluster // as an admin. -func AdminClusterInfo(namespace, name string) *ClusterInfo { +func AdminClusterInfo(ctx context.Context, namespace, name string) *ClusterInfo { ownerInfo := k8sutil.NewOwnerInfoWithOwnerRef(&metav1.OwnerReference{}, "") return &ClusterInfo{ Namespace: namespace, @@ -98,14 +98,14 @@ func AdminClusterInfo(namespace, name string) *ClusterInfo { }, name: name, OwnerInfo: ownerInfo, - Context: context.TODO(), + Context: ctx, } } // AdminTestClusterInfo() creates a ClusterInfo with the basic info to access the cluster // as an admin. This cluster info should only be used by unit or integration tests. func AdminTestClusterInfo(namespace string) *ClusterInfo { - return AdminClusterInfo(namespace, "testing") + return AdminClusterInfo(context.TODO(), namespace, "testing") } // IsInitialized returns true if the critical information in the ClusterInfo struct has been filled diff --git a/pkg/daemon/ceph/osd/kms/kms.go b/pkg/daemon/ceph/osd/kms/kms.go index cf1734284ab3..c19bd4015eae 100644 --- a/pkg/daemon/ceph/osd/kms/kms.go +++ b/pkg/daemon/ceph/osd/kms/kms.go @@ -86,7 +86,7 @@ func (c *Config) PutSecret(secretName, secretValue string) error { } if c.IsVault() { // Store the secret in Vault - v, err := InitVault(c.context, c.ClusterInfo.Namespace, c.clusterSpec.Security.KeyManagementService.ConnectionDetails) + v, err := InitVault(c.ClusterInfo.Context, c.context, c.ClusterInfo.Namespace, c.clusterSpec.Security.KeyManagementService.ConnectionDetails) if err != nil { return errors.Wrap(err, "failed to init vault kms") } @@ -123,7 +123,7 @@ func (c *Config) GetSecret(secretName string) (string, error) { var value string if c.IsVault() { // Store the secret in Vault - v, err := InitVault(c.context, c.ClusterInfo.Namespace, c.clusterSpec.Security.KeyManagementService.ConnectionDetails) + v, err := InitVault(c.ClusterInfo.Context, c.context, c.ClusterInfo.Namespace, c.clusterSpec.Security.KeyManagementService.ConnectionDetails) if err != nil { return "", errors.Wrap(err, "failed to init vault") } @@ -153,7 +153,7 @@ func (c *Config) GetSecret(secretName string) (string, error) { func (c *Config) DeleteSecret(secretName string) error { if c.IsVault() { // Store the secret in Vault - v, err := InitVault(c.context, c.ClusterInfo.Namespace, c.clusterSpec.Security.KeyManagementService.ConnectionDetails) + v, err := InitVault(c.ClusterInfo.Context, c.context, c.ClusterInfo.Namespace, c.clusterSpec.Security.KeyManagementService.ConnectionDetails) if err != nil { return errors.Wrap(err, "failed to delete secret in vault") } @@ -263,7 +263,7 @@ func ValidateConnectionDetails(ctx context.Context, clusterdContext *clusterd.Co // Validate KMS provider connection details for each provider switch provider { case secrets.TypeVault: - err := validateVaultConnectionDetails(clusterdContext, ns, securitySpec.KeyManagementService.ConnectionDetails) + err := validateVaultConnectionDetails(ctx, clusterdContext, ns, securitySpec.KeyManagementService.ConnectionDetails) if err != nil { return errors.Wrap(err, "failed to validate vault connection details") } @@ -273,7 +273,7 @@ func ValidateConnectionDetails(ctx context.Context, clusterdContext *clusterd.Co case VaultKVSecretEngineKey: // Append Backend Version if not already present if GetParam(securitySpec.KeyManagementService.ConnectionDetails, vault.VaultBackendKey) == "" { - backendVersion, err := BackendVersion(clusterdContext, ns, securitySpec.KeyManagementService.ConnectionDetails) + backendVersion, err := BackendVersion(ctx, clusterdContext, ns, securitySpec.KeyManagementService.ConnectionDetails) if err != nil { return errors.Wrap(err, "failed to get backend version") } diff --git a/pkg/daemon/ceph/osd/kms/kms_test.go b/pkg/daemon/ceph/osd/kms/kms_test.go index 459d8718beb0..4a4767ce01a1 100644 --- a/pkg/daemon/ceph/osd/kms/kms_test.go +++ b/pkg/daemon/ceph/osd/kms/kms_test.go @@ -34,7 +34,7 @@ import ( func TestValidateConnectionDetails(t *testing.T) { ctx := context.TODO() // Placeholder - context := &clusterd.Context{Clientset: test.New(t, 3)} + clusterdContext := &clusterd.Context{Clientset: test.New(t, 3)} securitySpec := &cephv1.SecuritySpec{KeyManagementService: cephv1.KeyManagementServiceSpec{ConnectionDetails: map[string]string{}}} ns := "rook-ceph" vaultSecret := &v1.Secret{ @@ -63,7 +63,7 @@ func TestValidateConnectionDetails(t *testing.T) { }} t.Run("no kms provider given", func(t *testing.T) { - err := ValidateConnectionDetails(ctx, context, securitySpec, ns) + err := ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to validate kms config \"KMS_PROVIDER\". cannot be empty") securitySpec.KeyManagementService.ConnectionDetails["KMS_PROVIDER"] = "vault" @@ -71,24 +71,24 @@ func TestValidateConnectionDetails(t *testing.T) { t.Run("vault - no token object", func(t *testing.T) { securitySpec.KeyManagementService.TokenSecretName = "vault-token" - err := ValidateConnectionDetails(ctx, context, securitySpec, ns) + err := ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to fetch kms token secret \"vault-token\": secrets \"vault-token\" not found") }) t.Run("vault - token secret present but empty content", func(t *testing.T) { - _, err := context.Clientset.CoreV1().Secrets(ns).Create(ctx, vaultSecret, metav1.CreateOptions{}) + _, err := clusterdContext.Clientset.CoreV1().Secrets(ns).Create(ctx, vaultSecret, metav1.CreateOptions{}) assert.NoError(t, err) - err = ValidateConnectionDetails(ctx, context, securitySpec, ns) + err = ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to read k8s kms secret \"token\" key \"vault-token\" (not found or empty)") }) t.Run("vault - token key does not exist", func(t *testing.T) { vaultSecret.Data = map[string][]byte{"foo": []byte("bar")} - _, err := context.Clientset.CoreV1().Secrets(ns).Update(ctx, vaultSecret, metav1.UpdateOptions{}) + _, err := clusterdContext.Clientset.CoreV1().Secrets(ns).Update(ctx, vaultSecret, metav1.UpdateOptions{}) assert.NoError(t, err) - err = ValidateConnectionDetails(ctx, context, securitySpec, ns) + err = ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to read k8s kms secret \"token\" key \"vault-token\" (not found or empty)") }) @@ -96,9 +96,9 @@ func TestValidateConnectionDetails(t *testing.T) { // Success: token content is ok t.Run("vault - token content is ok", func(t *testing.T) { vaultSecret.Data["token"] = []byte("token") - _, err := context.Clientset.CoreV1().Secrets(ns).Update(ctx, vaultSecret, metav1.UpdateOptions{}) + _, err := clusterdContext.Clientset.CoreV1().Secrets(ns).Update(ctx, vaultSecret, metav1.UpdateOptions{}) assert.NoError(t, err) - err = ValidateConnectionDetails(ctx, context, securitySpec, ns) + err = ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to validate vault connection details: failed to find connection details \"VAULT_ADDR\"") securitySpec.KeyManagementService.ConnectionDetails["VAULT_ADDR"] = "https://1.1.1.1:8200" @@ -106,24 +106,24 @@ func TestValidateConnectionDetails(t *testing.T) { t.Run("vault - TLS is configured but secrets do not exist", func(t *testing.T) { securitySpec.KeyManagementService.ConnectionDetails["VAULT_CACERT"] = "vault-ca-secret" - err := ValidateConnectionDetails(ctx, context, securitySpec, ns) + err := ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to validate vault connection details: failed to find TLS connection details k8s secret \"vault-ca-secret\"") }) t.Run("vault - TLS secret exists but empty key", func(t *testing.T) { - _, err := context.Clientset.CoreV1().Secrets(ns).Create(ctx, tlsSecret, metav1.CreateOptions{}) + _, err := clusterdContext.Clientset.CoreV1().Secrets(ns).Create(ctx, tlsSecret, metav1.CreateOptions{}) assert.NoError(t, err) - err = ValidateConnectionDetails(ctx, context, securitySpec, ns) + err = ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to validate vault connection details: failed to find TLS connection key \"cert\" for \"VAULT_CACERT\" in k8s secret \"vault-ca-secret\"") }) t.Run("vault - success TLS config is correct", func(t *testing.T) { tlsSecret.Data = map[string][]byte{"cert": []byte("envnrevbnbvsbjkrtn")} - _, err := context.Clientset.CoreV1().Secrets(ns).Update(ctx, tlsSecret, metav1.UpdateOptions{}) + _, err := clusterdContext.Clientset.CoreV1().Secrets(ns).Update(ctx, tlsSecret, metav1.UpdateOptions{}) assert.NoError(t, err) - err = ValidateConnectionDetails(ctx, context, securitySpec, ns) + err = ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.NoError(t, err, "") }) @@ -136,7 +136,7 @@ func TestValidateConnectionDetails(t *testing.T) { vault.TestWaitActive(t, core) client := cluster.Cores[0].Client // Mock the client here - vaultClient = func(clusterdContext *clusterd.Context, namespace string, secretConfig map[string]string) (*api.Client, error) { + vaultClient = func(ctx context.Context, clusterdContext *clusterd.Context, namespace string, secretConfig map[string]string) (*api.Client, error) { return client, nil } if err := client.Sys().Mount("rook/", &api.MountInput{ @@ -156,13 +156,13 @@ func TestValidateConnectionDetails(t *testing.T) { TokenSecretName: "vault-token", }, } - err := ValidateConnectionDetails(ctx, context, securitySpec, ns) + err := ValidateConnectionDetails(ctx, clusterdContext, securitySpec, ns) assert.NoError(t, err, "") assert.Equal(t, securitySpec.KeyManagementService.ConnectionDetails["VAULT_BACKEND"], "v2") }) t.Run("ibm kp - fail no token specified, only token is supported", func(t *testing.T) { - err := ValidateConnectionDetails(ctx, context, ibmSecuritySpec, ns) + err := ValidateConnectionDetails(ctx, clusterdContext, ibmSecuritySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to validate kms configuration (missing token in spec)") ibmSecuritySpec.KeyManagementService.TokenSecretName = "ibm-token" @@ -170,25 +170,25 @@ func TestValidateConnectionDetails(t *testing.T) { }) t.Run("ibm kp - token present but no key for service key", func(t *testing.T) { - _, err := context.Clientset.CoreV1().Secrets(ns).Create(ctx, ibmSecret, metav1.CreateOptions{}) + _, err := clusterdContext.Clientset.CoreV1().Secrets(ns).Create(ctx, ibmSecret, metav1.CreateOptions{}) assert.NoError(t, err) - err = ValidateConnectionDetails(ctx, context, ibmSecuritySpec, ns) + err = ValidateConnectionDetails(ctx, clusterdContext, ibmSecuritySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to read k8s kms secret \"IBM_KP_SERVICE_API_KEY\" key \"ibm-token\" (not found or empty)") }) t.Run("ibm kp - token ok but no instance id", func(t *testing.T) { ibmSecret.Data["IBM_KP_SERVICE_API_KEY"] = []byte("foo") - _, err := context.Clientset.CoreV1().Secrets(ns).Update(ctx, ibmSecret, metav1.UpdateOptions{}) + _, err := clusterdContext.Clientset.CoreV1().Secrets(ns).Update(ctx, ibmSecret, metav1.UpdateOptions{}) assert.NoError(t, err) - err = ValidateConnectionDetails(ctx, context, ibmSecuritySpec, ns) + err = ValidateConnectionDetails(ctx, clusterdContext, ibmSecuritySpec, ns) assert.Error(t, err, "") assert.EqualError(t, err, "failed to validate kms config \"IBM_KP_SERVICE_INSTANCE_ID\". cannot be empty") ibmSecuritySpec.KeyManagementService.ConnectionDetails["IBM_KP_SERVICE_INSTANCE_ID"] = "foo" }) t.Run("ibm kp - success", func(t *testing.T) { - err := ValidateConnectionDetails(ctx, context, ibmSecuritySpec, ns) + err := ValidateConnectionDetails(ctx, clusterdContext, ibmSecuritySpec, ns) assert.NoError(t, err, "") // IBM_KP_SERVICE_API_KEY must be appended to the details so that the client can be built with // all the details diff --git a/pkg/daemon/ceph/osd/kms/vault.go b/pkg/daemon/ceph/osd/kms/vault.go index 8fe99810f56a..b8720c442605 100644 --- a/pkg/daemon/ceph/osd/kms/vault.go +++ b/pkg/daemon/ceph/osd/kms/vault.go @@ -75,7 +75,7 @@ type removeCertFilesFunction func() */ // InitVault inits the secret store -func InitVault(context *clusterd.Context, namespace string, config map[string]string) (secrets.Secrets, error) { +func InitVault(ctx context.Context, context *clusterd.Context, namespace string, config map[string]string) (secrets.Secrets, error) { c := make(map[string]interface{}) // So that we don't alter the content of c.config for later iterations @@ -86,7 +86,7 @@ func InitVault(context *clusterd.Context, namespace string, config map[string]st } // Populate TLS config - newConfigWithTLS, removeCertFiles, err := configTLS(context, namespace, oriConfig) + newConfigWithTLS, removeCertFiles, err := configTLS(ctx, context, namespace, oriConfig) if err != nil { return nil, errors.Wrap(err, "failed to initialize vault tls configuration") } @@ -111,8 +111,7 @@ func InitVault(context *clusterd.Context, namespace string, config map[string]st // The signature has named result parameters to help building 'defer' statements especially for the // content of removeCertFiles which needs to be populated by the files to remove if no errors and be // nil on errors -func configTLS(clusterdContext *clusterd.Context, namespace string, config map[string]string) (newConfig map[string]string, removeCertFiles removeCertFilesFunction, retErr error) { - ctx := context.TODO() +func configTLS(ctx context.Context, clusterdContext *clusterd.Context, namespace string, config map[string]string) (newConfig map[string]string, removeCertFiles removeCertFilesFunction, retErr error) { var filesToRemove []*os.File defer func() { @@ -248,8 +247,7 @@ func (c *Config) IsVault() bool { return c.Provider == secrets.TypeVault } -func validateVaultConnectionDetails(clusterdContext *clusterd.Context, ns string, kmsConfig map[string]string) error { - ctx := context.TODO() +func validateVaultConnectionDetails(ctx context.Context, clusterdContext *clusterd.Context, ns string, kmsConfig map[string]string) error { for _, option := range vaultMandatoryConnectionDetails { if GetParam(kmsConfig, option) == "" { return errors.Errorf("failed to find connection details %q", option) diff --git a/pkg/daemon/ceph/osd/kms/vault_api.go b/pkg/daemon/ceph/osd/kms/vault_api.go index c2e489b925de..f409ac7b2d40 100644 --- a/pkg/daemon/ceph/osd/kms/vault_api.go +++ b/pkg/daemon/ceph/osd/kms/vault_api.go @@ -17,6 +17,7 @@ limitations under the License. package kms import ( + "context" "strings" "github.com/libopenstorage/secrets/vault" @@ -38,7 +39,7 @@ var vaultClient = newVaultClient // newVaultClient returns a vault client, there is no need for any secretConfig validation // Since this is called after an already validated call InitVault() -func newVaultClient(clusterdContext *clusterd.Context, namespace string, secretConfig map[string]string) (*api.Client, error) { +func newVaultClient(ctx context.Context, clusterdContext *clusterd.Context, namespace string, secretConfig map[string]string) (*api.Client, error) { // DefaultConfig uses the environment variables if present. config := api.DefaultConfig() @@ -56,7 +57,7 @@ func newVaultClient(clusterdContext *clusterd.Context, namespace string, secretC } // Populate TLS config - newConfigWithTLS, removeCertFiles, err := configTLS(clusterdContext, namespace, localSecretConfig) + newConfigWithTLS, removeCertFiles, err := configTLS(ctx, clusterdContext, namespace, localSecretConfig) if err != nil { return nil, errors.Wrap(err, "failed to initialize vault tls configuration") } @@ -106,7 +107,7 @@ func newVaultClient(clusterdContext *clusterd.Context, namespace string, secretC return client, nil } -func BackendVersion(clusterdContext *clusterd.Context, namespace string, secretConfig map[string]string) (string, error) { +func BackendVersion(ctx context.Context, clusterdContext *clusterd.Context, namespace string, secretConfig map[string]string) (string, error) { v1 := "v1" v2 := "v2" @@ -125,7 +126,7 @@ func BackendVersion(clusterdContext *clusterd.Context, namespace string, secretC return v2, nil default: // Initialize Vault client - vaultClient, err := vaultClient(clusterdContext, namespace, secretConfig) + vaultClient, err := vaultClient(ctx, clusterdContext, namespace, secretConfig) if err != nil { return "", errors.Wrap(err, "failed to initialize vault client") } diff --git a/pkg/daemon/ceph/osd/kms/vault_api_test.go b/pkg/daemon/ceph/osd/kms/vault_api_test.go index 50863abcfa57..6b8ea4bd4748 100644 --- a/pkg/daemon/ceph/osd/kms/vault_api_test.go +++ b/pkg/daemon/ceph/osd/kms/vault_api_test.go @@ -40,9 +40,10 @@ func TestBackendVersion(t *testing.T) { core := cluster.Cores[0].Core vault.TestWaitActive(t, core) client := cluster.Cores[0].Client + ctx := context.TODO() // Mock the client here - vaultClient = func(clusterdContext *clusterd.Context, namespace string, secretConfig map[string]string) (*api.Client, error) { + vaultClient = func(ctx context.Context, clusterdContext *clusterd.Context, namespace string, secretConfig map[string]string) (*api.Client, error) { return client, nil } @@ -76,7 +77,7 @@ func TestBackendVersion(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := BackendVersion(&clusterd.Context{}, "ns", tt.args.secretConfig) + got, err := BackendVersion(ctx, &clusterd.Context{}, "ns", tt.args.secretConfig) if (err != nil) != tt.wantErr { t.Errorf("BackendVersion() error = %v, wantErr %v", err, tt.wantErr) return @@ -186,7 +187,7 @@ cjLxGL8tcZjHKg== } // Populate TLS config - newConfigWithTLS, removeCertFiles, err := configTLS(context, ns, secretConfig) + newConfigWithTLS, removeCertFiles, err := configTLS(ctx, context, ns, secretConfig) assert.NoError(t, err) defer removeCertFiles() diff --git a/pkg/daemon/ceph/osd/kms/vault_test.go b/pkg/daemon/ceph/osd/kms/vault_test.go index 287abbb62727..2f2d866be855 100644 --- a/pkg/daemon/ceph/osd/kms/vault_test.go +++ b/pkg/daemon/ceph/osd/kms/vault_test.go @@ -69,7 +69,7 @@ func Test_configTLS(t *testing.T) { "VAULT_BACKEND_PATH": "vault", } // No tls config - _, removeCertFiles, err := configTLS(context, ns, config) + _, removeCertFiles, err := configTLS(ctx, context, ns, config) assert.NoError(t, err) defer removeCertFiles() }) @@ -83,7 +83,7 @@ func Test_configTLS(t *testing.T) { "VAULT_CACERT": "/etc/vault/cacert", "VAULT_SKIP_VERIFY": "false", } - config, removeCertFiles, err := configTLS(context, ns, config) + config, removeCertFiles, err := configTLS(ctx, context, ns, config) assert.NoError(t, err) assert.Equal(t, "/etc/vault/cacert", config["VAULT_CACERT"]) defer removeCertFiles() @@ -98,7 +98,7 @@ func Test_configTLS(t *testing.T) { "VAULT_CACERT": "vault-ca-cert", "VAULT_SKIP_VERIFY": "false", } - _, removeCertFiles, err := configTLS(context, ns, config) + _, removeCertFiles, err := configTLS(ctx, context, ns, config) assert.Error(t, err) assert.EqualError(t, err, "failed to fetch tls k8s secret \"vault-ca-cert\": secrets \"vault-ca-cert\" not found") assert.Nil(t, removeCertFiles) @@ -122,7 +122,7 @@ func Test_configTLS(t *testing.T) { } _, err := context.Clientset.CoreV1().Secrets(ns).Create(ctx, s, metav1.CreateOptions{}) assert.NoError(t, err) - config, removeCertFiles, err := configTLS(context, ns, config) + config, removeCertFiles, err := configTLS(ctx, context, ns, config) defer removeCertFiles() assert.NoError(t, err) assert.NotEqual(t, "vault-ca-cert", config["VAULT_CACERT"]) @@ -167,7 +167,7 @@ func Test_configTLS(t *testing.T) { assert.NoError(t, err) _, err = context.Clientset.CoreV1().Secrets(ns).Create(ctx, sClKey, metav1.CreateOptions{}) assert.NoError(t, err) - config, removeCertFiles, err := configTLS(context, ns, config) + config, removeCertFiles, err := configTLS(ctx, context, ns, config) assert.NoError(t, err) assert.NotEqual(t, "vault-ca-cert", config["VAULT_CACERT"]) assert.NotEqual(t, "vault-client-cert", config["VAULT_CLIENT_CERT"]) @@ -191,7 +191,7 @@ func Test_configTLS(t *testing.T) { "VAULT_CLIENT_CERT": "vault-client-cert", "VAULT_CLIENT_KEY": "vault-client-key", } - config, removeCertFiles, err := configTLS(context, ns, config) + config, removeCertFiles, err := configTLS(ctx, context, ns, config) assert.NoError(t, err) assert.NotEqual(t, "vault-ca-cert", config["VAULT_CACERT"]) assert.NotEqual(t, "vault-client-cert", config["VAULT_CLIENT_CERT"]) @@ -246,7 +246,7 @@ func Test_configTLS(t *testing.T) { "VAULT_CLIENT_CERT": "vault-client-cert", "VAULT_CLIENT_KEY": "vault-client-key", } - _, _, err := configTLS(context, ns, config) + _, _, err := configTLS(ctx, context, ns, config) assert.Error(t, err) assert.EqualError(t, err, "failed to generate temp file for k8s secret \"vault-ca-cert\" content: error creating tmp file") assert.NoFileExists(t, os.Getenv("ROOK_TMP_FILE")) diff --git a/pkg/daemon/discover/discover.go b/pkg/daemon/discover/discover.go index 099a307f884e..7fd2ee8212c5 100644 --- a/pkg/daemon/discover/discover.go +++ b/pkg/daemon/discover/discover.go @@ -74,7 +74,7 @@ type CephVolumeInventory struct { } // Run is the entry point of that package execution -func Run(context *clusterd.Context, probeInterval time.Duration, useCV bool) error { +func Run(ctx context.Context, context *clusterd.Context, probeInterval time.Duration, useCV bool) error { if context == nil { return fmt.Errorf("nil context") } @@ -87,7 +87,7 @@ func Run(context *clusterd.Context, probeInterval time.Duration, useCV bool) err sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGTERM) - err := updateDeviceCM(context) + err := updateDeviceCM(ctx, context) if err != nil { logger.Infof("failed to update device configmap: %v", err) return err @@ -101,13 +101,13 @@ func Run(context *clusterd.Context, probeInterval time.Duration, useCV bool) err logger.Infof("shutdown signal received, exiting...") return nil case <-time.After(probeInterval): - if err := updateDeviceCM(context); err != nil { + if err := updateDeviceCM(ctx, context); err != nil { logger.Errorf("failed to update device configmap during probe interval. %v", err) } case _, ok := <-udevEvents: if ok { logger.Info("trigger probe from udev event") - if err := updateDeviceCM(context); err != nil { + if err := updateDeviceCM(ctx, context); err != nil { logger.Errorf("failed to update device configmap triggered from udev event. %v", err) } } else { @@ -323,8 +323,7 @@ func DeviceListsEqual(old, new string) (bool, error) { return checkDeviceListsEqual(oldDevs, newDevs), nil } -func updateDeviceCM(clusterdContext *clusterd.Context) error { - ctx := context.TODO() +func updateDeviceCM(ctx context.Context, clusterdContext *clusterd.Context) error { logger.Infof("updating device configmap") devices, err := probeDevices(clusterdContext) if err != nil { diff --git a/pkg/operator/ceph/cluster/cluster.go b/pkg/operator/ceph/cluster/cluster.go index b689ed43d1b0..abe7b57b195b 100755 --- a/pkg/operator/ceph/cluster/cluster.go +++ b/pkg/operator/ceph/cluster/cluster.go @@ -67,19 +67,19 @@ type clusterHealth struct { internalCancel context.CancelFunc } -func newCluster(c *cephv1.CephCluster, context *clusterd.Context, ownerInfo *k8sutil.OwnerInfo) *cluster { +func newCluster(ctx context.Context, c *cephv1.CephCluster, context *clusterd.Context, ownerInfo *k8sutil.OwnerInfo) *cluster { return &cluster{ // at this phase of the cluster creation process, the identity components of the cluster are // not yet established. we reserve this struct which is filled in as soon as the cluster's // identity can be established. - ClusterInfo: client.AdminClusterInfo(c.Namespace, c.Name), + ClusterInfo: client.AdminClusterInfo(ctx, c.Namespace, c.Name), Namespace: c.Namespace, Spec: &c.Spec, context: context, namespacedName: types.NamespacedName{Namespace: c.Namespace, Name: c.Name}, monitoringRoutines: make(map[string]*clusterHealth), ownerInfo: ownerInfo, - mons: mon.New(context, c.Namespace, c.Spec, ownerInfo), + mons: mon.New(ctx, context, c.Namespace, c.Spec, ownerInfo), // update observedGeneration with current generation value, // because generation can be changed before reconile got completed // CR status will be updated at end of reconcile, so to reflect the reconcile has finished diff --git a/pkg/operator/ceph/cluster/cluster_external.go b/pkg/operator/ceph/cluster/cluster_external.go index aea264f51b71..95120a2fb025 100644 --- a/pkg/operator/ceph/cluster/cluster_external.go +++ b/pkg/operator/ceph/cluster/cluster_external.go @@ -108,7 +108,7 @@ func (c *ClusterController) configureExternalCephCluster(cluster *cluster) error } // Create CSI config map - err = csi.CreateCsiConfigMap(c.namespacedName.Namespace, c.context.Clientset, cluster.ownerInfo) + err = csi.CreateCsiConfigMap(c.OpManagerCtx, c.namespacedName.Namespace, c.context.Clientset, cluster.ownerInfo) if err != nil { return errors.Wrap(err, "failed to create csi config map") } diff --git a/pkg/operator/ceph/cluster/controller.go b/pkg/operator/ceph/cluster/controller.go index f8a1a5c195ab..254e82f979e5 100644 --- a/pkg/operator/ceph/cluster/controller.go +++ b/pkg/operator/ceph/cluster/controller.go @@ -177,7 +177,7 @@ func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reco }, }, handler.EnqueueRequestsFromMapFunc(handlerFunc), - predicateForNodeWatcher(mgr.GetClient(), context)) + predicateForNodeWatcher(opManagerContext, mgr.GetClient(), context)) if err != nil { return err } @@ -277,10 +277,10 @@ func (r *ReconcileCephCluster) reconcileDelete(cephCluster *cephv1.CephCluster) return reconcile.Result{}, *cephCluster, err } if !deps.Empty() { - err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephCluster, deps) + err := reporting.ReportDeletionBlockedDueToDependents(r.opManagerContext, logger, r.client, cephCluster, deps) return opcontroller.WaitForRequeueIfFinalizerBlocked, *cephCluster, err } - reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.clusterController.recorder, cephCluster) + reporting.ReportDeletionNotBlockedDueToDependents(r.opManagerContext, logger, r.client, r.clusterController.recorder, cephCluster) doCleanup := true @@ -340,7 +340,7 @@ func (c *ClusterController) reconcileCephCluster(clusterObj *cephv1.CephCluster, cluster, ok := c.clusterMap[clusterObj.Namespace] if !ok { // It's a new cluster so let's populate the struct - cluster = newCluster(clusterObj, c.context, ownerInfo) + cluster = newCluster(c.OpManagerCtx, clusterObj, c.context, ownerInfo) } cluster.namespacedName = c.namespacedName // updating observedGeneration in cluster if it's not the first reconcile @@ -520,7 +520,7 @@ func (c *ClusterController) deleteOSDEncryptionKeyFromKMS(currentCluster *cephv1 } // Fetch PVCs - osdPVCs, _, err := osd.GetExistingPVCs(c.context, currentCluster.Namespace) + osdPVCs, _, err := osd.GetExistingPVCs(c.OpManagerCtx, c.context, currentCluster.Namespace) if err != nil { return errors.Wrap(err, "failed to list osd pvc") } diff --git a/pkg/operator/ceph/cluster/mgr/drain.go b/pkg/operator/ceph/cluster/mgr/drain.go index f4892ce771c2..6671220ccfb4 100644 --- a/pkg/operator/ceph/cluster/mgr/drain.go +++ b/pkg/operator/ceph/cluster/mgr/drain.go @@ -17,8 +17,6 @@ limitations under the License. package mgr import ( - "context" - "github.com/pkg/errors" "github.com/rook/rook/pkg/operator/k8sutil" policyv1 "k8s.io/api/policy/v1" @@ -58,7 +56,7 @@ func (c *Cluster) reconcileMgrPDB() error { } return nil } - op, err := controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc) + op, err := controllerutil.CreateOrUpdate(c.clusterInfo.Context, c.context.Client, pdb, mutateFunc) if err != nil { return errors.Wrapf(err, "failed to reconcile mgr pdb on op %q", op) } @@ -74,7 +72,7 @@ func (c *Cluster) reconcileMgrPDB() error { } return nil } - op, err := controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc) + op, err := controllerutil.CreateOrUpdate(c.clusterInfo.Context, c.context.Client, pdb, mutateFunc) if err != nil { return errors.Wrapf(err, "failed to reconcile mgr pdb on op %q", op) } @@ -90,7 +88,7 @@ func (c *Cluster) deleteMgrPDB() { } if usePDBV1Beta1 { mgrPDB := &policyv1beta1.PodDisruptionBudget{} - err := c.context.Client.Get(context.TODO(), pdbRequest, mgrPDB) + err := c.context.Client.Get(c.clusterInfo.Context, pdbRequest, mgrPDB) if err != nil { if !kerrors.IsNotFound(err) { logger.Errorf("failed to get mgr pdb %q. %v", mgrPDBName, err) @@ -98,14 +96,14 @@ func (c *Cluster) deleteMgrPDB() { return } logger.Debugf("ensuring the mgr pdb %q is deleted", mgrPDBName) - err = c.context.Client.Delete(context.TODO(), mgrPDB) + err = c.context.Client.Delete(c.clusterInfo.Context, mgrPDB) if err != nil { logger.Errorf("failed to delete mgr pdb %q. %v", mgrPDBName, err) return } } mgrPDB := &policyv1.PodDisruptionBudget{} - err = c.context.Client.Get(context.TODO(), pdbRequest, mgrPDB) + err = c.context.Client.Get(c.clusterInfo.Context, pdbRequest, mgrPDB) if err != nil { if !kerrors.IsNotFound(err) { logger.Errorf("failed to get mgr pdb %q. %v", mgrPDBName, err) @@ -113,7 +111,7 @@ func (c *Cluster) deleteMgrPDB() { return } logger.Debugf("ensuring the mgr pdb %q is deleted", mgrPDBName) - err = c.context.Client.Delete(context.TODO(), mgrPDB) + err = c.context.Client.Delete(c.clusterInfo.Context, mgrPDB) if err != nil { logger.Errorf("failed to delete mgr pdb %q. %v", mgrPDBName, err) } diff --git a/pkg/operator/ceph/cluster/mon/config.go b/pkg/operator/ceph/cluster/mon/config.go index 4750458f450f..57860d9c152f 100644 --- a/pkg/operator/ceph/cluster/mon/config.go +++ b/pkg/operator/ceph/cluster/mon/config.go @@ -137,7 +137,7 @@ func CreateOrLoadClusterInfo(clusterdContext *clusterd.Context, context context. } // get the existing monitor config - clusterInfo.Monitors, maxMonID, monMapping, err = loadMonConfig(clusterdContext.Clientset, namespace) + clusterInfo.Monitors, maxMonID, monMapping, err = loadMonConfig(context, clusterdContext.Clientset, namespace) if err != nil { return nil, maxMonID, monMapping, errors.Wrap(err, "failed to get mon config") } @@ -175,8 +175,7 @@ func WriteConnectionConfig(context *clusterd.Context, clusterInfo *cephclient.Cl } // loadMonConfig returns the monitor endpoints and maxMonID -func loadMonConfig(clientset kubernetes.Interface, namespace string) (map[string]*cephclient.MonInfo, int, *Mapping, error) { - ctx := context.TODO() +func loadMonConfig(ctx context.Context, clientset kubernetes.Interface, namespace string) (map[string]*cephclient.MonInfo, int, *Mapping, error) { monEndpointMap := map[string]*cephclient.MonInfo{} maxMonID := -1 monMapping := &Mapping{ diff --git a/pkg/operator/ceph/cluster/mon/drain.go b/pkg/operator/ceph/cluster/mon/drain.go index 0a31d4187ebb..d1b745fdf283 100644 --- a/pkg/operator/ceph/cluster/mon/drain.go +++ b/pkg/operator/ceph/cluster/mon/drain.go @@ -17,8 +17,6 @@ limitations under the License. package mon import ( - "context" - "github.com/pkg/errors" "github.com/rook/rook/pkg/operator/k8sutil" policyv1 "k8s.io/api/policy/v1" @@ -75,7 +73,7 @@ func (c *Cluster) createOrUpdateMonPDB(maxUnavailable int32) (controllerutil.Ope } return nil } - return controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc) + return controllerutil.CreateOrUpdate(c.ClusterInfo.Context, c.context.Client, pdb, mutateFunc) } pdb := &policyv1.PodDisruptionBudget{ ObjectMeta: objectMeta} @@ -87,7 +85,7 @@ func (c *Cluster) createOrUpdateMonPDB(maxUnavailable int32) (controllerutil.Ope } return nil } - return controllerutil.CreateOrUpdate(context.TODO(), c.context.Client, pdb, mutateFunc) + return controllerutil.CreateOrUpdate(c.ClusterInfo.Context, c.context.Client, pdb, mutateFunc) } // blockMonDrain makes MaxUnavailable in mon PDB to 0 to block any voluntary mon drains diff --git a/pkg/operator/ceph/cluster/mon/drain_test.go b/pkg/operator/ceph/cluster/mon/drain_test.go index 2ff565d21ae7..0bff39becf4c 100644 --- a/pkg/operator/ceph/cluster/mon/drain_test.go +++ b/pkg/operator/ceph/cluster/mon/drain_test.go @@ -38,6 +38,7 @@ const ( ) func createFakeCluster(t *testing.T, cephClusterObj *cephv1.CephCluster, k8sVersion string) *Cluster { + ctx := context.TODO() ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() scheme := scheme.Scheme err := policyv1.AddToScheme(scheme) @@ -47,7 +48,7 @@ func createFakeCluster(t *testing.T, cephClusterObj *cephv1.CephCluster, k8sVers cl := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects().Build() clientset := test.New(t, 3) - c := New(&clusterd.Context{Client: cl, Clientset: clientset}, mockNamespace, cephClusterObj.Spec, ownerInfo) + c := New(ctx, &clusterd.Context{Client: cl, Clientset: clientset}, mockNamespace, cephClusterObj.Spec, ownerInfo) test.SetFakeKubernetesVersion(clientset, k8sVersion) return c } diff --git a/pkg/operator/ceph/cluster/mon/health_test.go b/pkg/operator/ceph/cluster/mon/health_test.go index b7c27483ed16..329607f3ab70 100644 --- a/pkg/operator/ceph/cluster/mon/health_test.go +++ b/pkg/operator/ceph/cluster/mon/health_test.go @@ -63,7 +63,7 @@ func TestCheckHealth(t *testing.T) { Executor: executor, } ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - c := New(context, "ns", cephv1.ClusterSpec{}, ownerInfo) + c := New(ctx, context, "ns", cephv1.ClusterSpec{}, ownerInfo) // clusterInfo is nil so we return err err := c.checkHealth(ctx) assert.NotNil(t, err) @@ -151,7 +151,7 @@ func TestCheckHealth(t *testing.T) { } func TestSkipMonFailover(t *testing.T) { - c := New(&clusterd.Context{}, "ns", cephv1.ClusterSpec{}, nil) + c := New(context.TODO(), &clusterd.Context{}, "ns", cephv1.ClusterSpec{}, nil) c.ClusterInfo = clienttest.CreateTestClusterInfo(1) monName := "arb" @@ -196,7 +196,7 @@ func TestEvictMonOnSameNode(t *testing.T) { } context := &clusterd.Context{Clientset: clientset, ConfigDir: configDir, Executor: executor} ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - c := New(context, "ns", cephv1.ClusterSpec{}, ownerInfo) + c := New(ctx, context, "ns", cephv1.ClusterSpec{}, ownerInfo) setCommonMonProperties(c, 1, cephv1.MonSpec{Count: 0}, "myversion") c.maxMonID = 2 c.waitForStart = false @@ -251,7 +251,7 @@ func TestScaleMonDeployment(t *testing.T) { clientset := test.New(t, 1) context := &clusterd.Context{Clientset: clientset} ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - c := New(context, "ns", cephv1.ClusterSpec{}, ownerInfo) + c := New(ctx, context, "ns", cephv1.ClusterSpec{}, ownerInfo) setCommonMonProperties(c, 1, cephv1.MonSpec{Count: 0, AllowMultiplePerNode: true}, "myversion") name := "a" @@ -301,7 +301,7 @@ func TestCheckHealthNotFound(t *testing.T) { Executor: executor, } ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - c := New(context, "ns", cephv1.ClusterSpec{}, ownerInfo) + c := New(ctx, context, "ns", cephv1.ClusterSpec{}, ownerInfo) setCommonMonProperties(c, 2, cephv1.MonSpec{Count: 3, AllowMultiplePerNode: true}, "myversion") c.waitForStart = false @@ -362,7 +362,7 @@ func TestAddRemoveMons(t *testing.T) { Executor: executor, } ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - c := New(context, "ns", cephv1.ClusterSpec{}, ownerInfo) + c := New(ctx, context, "ns", cephv1.ClusterSpec{}, ownerInfo) setCommonMonProperties(c, 0, cephv1.MonSpec{Count: 5, AllowMultiplePerNode: true}, "myversion") c.maxMonID = 0 // "a" is max mon id c.waitForStart = false diff --git a/pkg/operator/ceph/cluster/mon/mon.go b/pkg/operator/ceph/cluster/mon/mon.go index 28cf951f9815..8c03eaa957fa 100644 --- a/pkg/operator/ceph/cluster/mon/mon.go +++ b/pkg/operator/ceph/cluster/mon/mon.go @@ -20,6 +20,7 @@ limitations under the License. package mon import ( + "context" "encoding/json" "fmt" "reflect" @@ -163,9 +164,9 @@ type SchedulingResult struct { } // New creates an instance of a mon cluster -func New(context *clusterd.Context, namespace string, spec cephv1.ClusterSpec, ownerInfo *k8sutil.OwnerInfo) *Cluster { +func New(ctx context.Context, clusterdContext *clusterd.Context, namespace string, spec cephv1.ClusterSpec, ownerInfo *k8sutil.OwnerInfo) *Cluster { return &Cluster{ - context: context, + context: clusterdContext, spec: spec, Namespace: namespace, maxMonID: -1, @@ -175,6 +176,9 @@ func New(context *clusterd.Context, namespace string, spec cephv1.ClusterSpec, o Schedule: map[string]*MonScheduleInfo{}, }, ownerInfo: ownerInfo, + ClusterInfo: &cephclient.ClusterInfo{ + Context: ctx, + }, } } diff --git a/pkg/operator/ceph/cluster/mon/mon_test.go b/pkg/operator/ceph/cluster/mon/mon_test.go index 402fac2294dd..572fe37dfd35 100644 --- a/pkg/operator/ceph/cluster/mon/mon_test.go +++ b/pkg/operator/ceph/cluster/mon/mon_test.go @@ -270,7 +270,7 @@ func validateStart(t *testing.T, c *Cluster) { func TestPersistMons(t *testing.T) { clientset := test.New(t, 1) ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - c := New(&clusterd.Context{Clientset: clientset}, "ns", cephv1.ClusterSpec{Annotations: cephv1.AnnotationsSpec{cephv1.KeyClusterMetadata: cephv1.Annotations{"key": "value"}}}, ownerInfo) + c := New(context.TODO(), &clusterd.Context{Clientset: clientset}, "ns", cephv1.ClusterSpec{Annotations: cephv1.AnnotationsSpec{cephv1.KeyClusterMetadata: cephv1.Annotations{"key": "value"}}}, ownerInfo) setCommonMonProperties(c, 1, cephv1.MonSpec{Count: 3, AllowMultiplePerNode: true}, "myversion") // Persist mon a @@ -298,7 +298,7 @@ func TestSaveMonEndpoints(t *testing.T) { clientset := test.New(t, 1) configDir := t.TempDir() ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - c := New(&clusterd.Context{Clientset: clientset, ConfigDir: configDir}, "ns", cephv1.ClusterSpec{}, ownerInfo) + c := New(ctx, &clusterd.Context{Clientset: clientset, ConfigDir: configDir}, "ns", cephv1.ClusterSpec{}, ownerInfo) setCommonMonProperties(c, 1, cephv1.MonSpec{Count: 3, AllowMultiplePerNode: true}, "myversion") // create the initial config map @@ -345,7 +345,7 @@ func TestMaxMonID(t *testing.T) { clientset := test.New(t, 1) configDir := t.TempDir() ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() - c := New(&clusterd.Context{Clientset: clientset, ConfigDir: configDir}, "ns", cephv1.ClusterSpec{}, ownerInfo) + c := New(context.TODO(), &clusterd.Context{Clientset: clientset, ConfigDir: configDir}, "ns", cephv1.ClusterSpec{}, ownerInfo) c.ClusterInfo = clienttest.CreateTestClusterInfo(1) // when the configmap is not found, the maxMonID is -1 diff --git a/pkg/operator/ceph/cluster/mon/node_test.go b/pkg/operator/ceph/cluster/mon/node_test.go index 98bf7fbe88a4..c5bb7005a2db 100644 --- a/pkg/operator/ceph/cluster/mon/node_test.go +++ b/pkg/operator/ceph/cluster/mon/node_test.go @@ -36,7 +36,7 @@ import ( func TestNodeAffinity(t *testing.T) { ctx := context.TODO() clientset := test.New(t, 4) - c := New(&clusterd.Context{Clientset: clientset}, "ns", cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}) + c := New(ctx, &clusterd.Context{Clientset: clientset}, "ns", cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}) setCommonMonProperties(c, 0, cephv1.MonSpec{Count: 3, AllowMultiplePerNode: true}, "myversion") c.spec.Placement = map[cephv1.KeyType]cephv1.Placement{} @@ -166,7 +166,7 @@ func TestPodMemory(t *testing.T) { func TestHostNetwork(t *testing.T) { clientset := test.New(t, 3) - c := New(&clusterd.Context{Clientset: clientset}, "ns", cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}) + c := New(context.TODO(), &clusterd.Context{Clientset: clientset}, "ns", cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}) setCommonMonProperties(c, 0, cephv1.MonSpec{Count: 3, AllowMultiplePerNode: true}, "myversion") c.spec.Network.HostNetwork = true diff --git a/pkg/operator/ceph/cluster/mon/service_test.go b/pkg/operator/ceph/cluster/mon/service_test.go index c8cf31e36d9b..c80072b825e7 100644 --- a/pkg/operator/ceph/cluster/mon/service_test.go +++ b/pkg/operator/ceph/cluster/mon/service_test.go @@ -32,7 +32,7 @@ import ( func TestCreateService(t *testing.T) { ctx := context.TODO() clientset := test.New(t, 1) - c := New(&clusterd.Context{Clientset: clientset}, "ns", cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}) + c := New(ctx, &clusterd.Context{Clientset: clientset}, "ns", cephv1.ClusterSpec{}, &k8sutil.OwnerInfo{}) c.ClusterInfo = client.AdminTestClusterInfo("rook-ceph") m := &monConfig{ResourceName: "rook-ceph-mon-b", DaemonName: "b"} clusterIP, err := c.createService(m) diff --git a/pkg/operator/ceph/cluster/mon/spec_test.go b/pkg/operator/ceph/cluster/mon/spec_test.go index a5059fc24aef..c315bd4df746 100644 --- a/pkg/operator/ceph/cluster/mon/spec_test.go +++ b/pkg/operator/ceph/cluster/mon/spec_test.go @@ -17,6 +17,7 @@ limitations under the License. package mon import ( + "context" "testing" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" @@ -43,6 +44,7 @@ func testPodSpec(t *testing.T, monID string, pvc bool) { clientset := testop.New(t, 1) ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() c := New( + context.TODO(), &clusterd.Context{Clientset: clientset, ConfigDir: "/var/lib/rook"}, "ns", cephv1.ClusterSpec{}, @@ -104,6 +106,7 @@ func TestDeploymentPVCSpec(t *testing.T) { clientset := testop.New(t, 1) ownerInfo := cephclient.NewMinimumOwnerInfoWithOwnerRef() c := New( + context.TODO(), &clusterd.Context{Clientset: clientset, ConfigDir: "/var/lib/rook"}, "ns", cephv1.ClusterSpec{}, @@ -163,6 +166,7 @@ func TestDeploymentPVCSpec(t *testing.T) { func testRequiredDuringScheduling(t *testing.T, hostNetwork, allowMultiplePerNode, required bool) { c := New( + context.TODO(), &clusterd.Context{}, "ns", cephv1.ClusterSpec{}, diff --git a/pkg/operator/ceph/cluster/osd/deviceSet.go b/pkg/operator/ceph/cluster/osd/deviceSet.go index bcac44b34d51..df150b21050e 100644 --- a/pkg/operator/ceph/cluster/osd/deviceSet.go +++ b/pkg/operator/ceph/cluster/osd/deviceSet.go @@ -80,7 +80,7 @@ func (c *Cluster) PrepareStorageClassDeviceSets() error { func (c *Cluster) prepareStorageClassDeviceSets(errs *provisionErrors) { c.deviceSets = []deviceSet{} - existingPVCs, uniqueOSDsPerDeviceSet, err := GetExistingPVCs(c.context, c.clusterInfo.Namespace) + existingPVCs, uniqueOSDsPerDeviceSet, err := GetExistingPVCs(c.clusterInfo.Context, c.context, c.clusterInfo.Namespace) if err != nil { errs.addError("failed to detect existing OSD PVCs. %v", err) return @@ -257,8 +257,7 @@ func makeDeviceSetPVC(deviceSetName, pvcID string, setIndex int, pvcTemplate v1. } // GetExistingPVCs fetches the list of OSD PVCs -func GetExistingPVCs(clusterdContext *clusterd.Context, namespace string) (map[string]*v1.PersistentVolumeClaim, map[string]sets.String, error) { - ctx := context.TODO() +func GetExistingPVCs(ctx context.Context, clusterdContext *clusterd.Context, namespace string) (map[string]*v1.PersistentVolumeClaim, map[string]sets.String, error) { selector := metav1.ListOptions{LabelSelector: CephDeviceSetPVCIDLabelKey} pvcs, err := clusterdContext.Clientset.CoreV1().PersistentVolumeClaims(namespace).List(ctx, selector) if err != nil { diff --git a/pkg/operator/ceph/cluster/predicate.go b/pkg/operator/ceph/cluster/predicate.go index a0113368ff90..001adb548214 100644 --- a/pkg/operator/ceph/cluster/predicate.go +++ b/pkg/operator/ceph/cluster/predicate.go @@ -35,16 +35,16 @@ import ( ) // predicateForNodeWatcher is the predicate function to trigger reconcile on Node events -func predicateForNodeWatcher(client client.Client, context *clusterd.Context) predicate.Funcs { +func predicateForNodeWatcher(ctx context.Context, client client.Client, context *clusterd.Context) predicate.Funcs { return predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { clientCluster := newClientCluster(client, e.Object.GetNamespace(), context) - return clientCluster.onK8sNode(e.Object) + return clientCluster.onK8sNode(ctx, e.Object) }, UpdateFunc: func(e event.UpdateEvent) bool { clientCluster := newClientCluster(client, e.ObjectNew.GetNamespace(), context) - return clientCluster.onK8sNode(e.ObjectNew) + return clientCluster.onK8sNode(ctx, e.ObjectNew) }, DeleteFunc: func(e event.DeleteEvent) bool { diff --git a/pkg/operator/ceph/cluster/watcher.go b/pkg/operator/ceph/cluster/watcher.go index cff3c293f0d2..391fb7388782 100644 --- a/pkg/operator/ceph/cluster/watcher.go +++ b/pkg/operator/ceph/cluster/watcher.go @@ -60,7 +60,7 @@ func checkStorageForNode(cluster *cephv1.CephCluster) bool { } // onK8sNodeAdd is triggered when a node is added in the Kubernetes cluster -func (c *clientCluster) onK8sNode(object runtime.Object) bool { +func (c *clientCluster) onK8sNode(ctx context.Context, object runtime.Object) bool { node, ok := object.(*v1.Node) if !ok { return false @@ -106,7 +106,7 @@ func (c *clientCluster) onK8sNode(object runtime.Object) bool { // Is the node in the CRUSH map already? // If so we don't need to reconcile, this is done to avoid double reconcile on operator restart // Assume the admin key since we are watching for node status to create OSDs - clusterInfo := cephclient.AdminClusterInfo(cluster.Namespace, cluster.Name) + clusterInfo := cephclient.AdminClusterInfo(ctx, cluster.Namespace, cluster.Name) osds, err := cephclient.GetOSDOnHost(c.context, clusterInfo, nodeName) if err != nil { if strings.Contains(err.Error(), opcontroller.UninitializedCephConfigError) { diff --git a/pkg/operator/ceph/cluster/watcher_test.go b/pkg/operator/ceph/cluster/watcher_test.go index 17f1834df6e9..8aa3cbc162e8 100644 --- a/pkg/operator/ceph/cluster/watcher_test.go +++ b/pkg/operator/ceph/cluster/watcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package cluster import ( + "context" "os" "testing" @@ -87,6 +88,7 @@ func TestCheckStorageForNode(t *testing.T) { func TestOnK8sNode(t *testing.T) { ns := "rook-ceph" + ctx := context.TODO() cephCluster := fakeCluster(ns) objects := []runtime.Object{ cephCluster, @@ -136,11 +138,11 @@ func TestOnK8sNode(t *testing.T) { cephCluster.Status.Phase = k8sutil.ReadyStatus client = getFakeClient(objects...) clientCluster.client = client - b := clientCluster.onK8sNode(node) + b := clientCluster.onK8sNode(ctx, node) assert.True(t, b) // node will not reconcile - b = clientCluster.onK8sNode(node) + b = clientCluster.onK8sNode(ctx, node) assert.False(t, b) } diff --git a/pkg/operator/ceph/config/config.go b/pkg/operator/ceph/config/config.go index 14de2a231738..5cb7754d77ba 100644 --- a/pkg/operator/ceph/config/config.go +++ b/pkg/operator/ceph/config/config.go @@ -141,7 +141,7 @@ func SetOrRemoveDefaultConfigs( // Apply Multus if needed if clusterSpec.Network.IsMultus() { logger.Info("configuring ceph network(s) with multus") - cephNetworks, err := generateNetworkSettings(context, clusterInfo.Namespace, clusterSpec.Network.Selectors) + cephNetworks, err := generateNetworkSettings(clusterInfo.Context, context, clusterInfo.Namespace, clusterSpec.Network.Selectors) if err != nil { return errors.Wrap(err, "failed to generate network settings") } diff --git a/pkg/operator/ceph/config/network.go b/pkg/operator/ceph/config/network.go index 68478f448cfd..574577735405 100644 --- a/pkg/operator/ceph/config/network.go +++ b/pkg/operator/ceph/config/network.go @@ -45,8 +45,7 @@ var ( NetworkSelectors = []string{PublicNetworkSelectorKeyName, ClusterNetworkSelectorKeyName} ) -func generateNetworkSettings(clusterdContext *clusterd.Context, namespace string, networkSelectors map[string]string) ([]Option, error) { - ctx := context.TODO() +func generateNetworkSettings(ctx context.Context, clusterdContext *clusterd.Context, namespace string, networkSelectors map[string]string) ([]Option, error) { cephNetworks := []Option{} for _, selectorKey := range NetworkSelectors { diff --git a/pkg/operator/ceph/config/network_test.go b/pkg/operator/ceph/config/network_test.go index 00df7f67756f..2e9c99821a64 100644 --- a/pkg/operator/ceph/config/network_test.go +++ b/pkg/operator/ceph/config/network_test.go @@ -32,14 +32,15 @@ import ( func TestGenerateNetworkSettings(t *testing.T) { t.Run("no network definition exists", func(*testing.T) { + ctx := context.TODO() ns := "rook-ceph" clientset := testop.New(t, 1) - ctx := &clusterd.Context{ + clusterdContext := &clusterd.Context{ Clientset: clientset, NetworkClient: fakenetclient.NewSimpleClientset().K8sCniCncfIoV1(), } netSelector := map[string]string{"public": "public-network-attach-def"} - _, err := generateNetworkSettings(ctx, ns, netSelector) + _, err := generateNetworkSettings(ctx, clusterdContext, ns, netSelector) assert.Error(t, err) }) @@ -57,25 +58,25 @@ func TestGenerateNetworkSettings(t *testing.T) { ctxt := context.TODO() ns := "rook-ceph" clientset := testop.New(t, 1) - ctx := &clusterd.Context{ + clusterdContext := &clusterd.Context{ Clientset: clientset, NetworkClient: fakenetclient.NewSimpleClientset().K8sCniCncfIoV1(), } for i := range networks { - _, err := ctx.NetworkClient.NetworkAttachmentDefinitions(ns).Create(ctxt, &networks[i], metav1.CreateOptions{}) + _, err := clusterdContext.NetworkClient.NetworkAttachmentDefinitions(ns).Create(ctxt, &networks[i], metav1.CreateOptions{}) assert.NoError(t, err) } - cephNetwork, err := generateNetworkSettings(ctx, ns, netSelector) + cephNetwork, err := generateNetworkSettings(context.TODO(), clusterdContext, ns, netSelector) assert.NoError(t, err) assert.ElementsMatch(t, cephNetwork, expectedNetworks, fmt.Sprintf("networks: %+v", cephNetwork)) }) t.Run("only public network", func(*testing.T) { - ctxt := context.TODO() + ctx := context.TODO() ns := "rook-ceph" clientset := testop.New(t, 1) - ctx := &clusterd.Context{ + clusterdContext := &clusterd.Context{ Clientset: clientset, NetworkClient: fakenetclient.NewSimpleClientset().K8sCniCncfIoV1(), } @@ -89,20 +90,20 @@ func TestGenerateNetworkSettings(t *testing.T) { }, } for i := range networks { - _, err := ctx.NetworkClient.NetworkAttachmentDefinitions(ns).Create(ctxt, &networks[i], metav1.CreateOptions{}) + _, err := clusterdContext.NetworkClient.NetworkAttachmentDefinitions(ns).Create(ctx, &networks[i], metav1.CreateOptions{}) assert.NoError(t, err) } - cephNetwork, err := generateNetworkSettings(ctx, ns, netSelector) + cephNetwork, err := generateNetworkSettings(ctx, clusterdContext, ns, netSelector) assert.NoError(t, err) assert.ElementsMatch(t, cephNetwork, expectedNetworks, fmt.Sprintf("networks: %+v", cephNetwork)) }) t.Run("public and cluster network", func(*testing.T) { - ctxt := context.TODO() + ctx := context.TODO() ns := "rook-ceph" clientset := testop.New(t, 1) - ctx := &clusterd.Context{ + clusterdContext := &clusterd.Context{ Clientset: clientset, NetworkClient: fakenetclient.NewSimpleClientset().K8sCniCncfIoV1(), } @@ -124,10 +125,10 @@ func TestGenerateNetworkSettings(t *testing.T) { }, } for i := range networks { - _, err := ctx.NetworkClient.NetworkAttachmentDefinitions(ns).Create(ctxt, &networks[i], metav1.CreateOptions{}) + _, err := clusterdContext.NetworkClient.NetworkAttachmentDefinitions(ns).Create(ctx, &networks[i], metav1.CreateOptions{}) assert.NoError(t, err) } - cephNetwork, err := generateNetworkSettings(ctx, ns, netSelector) + cephNetwork, err := generateNetworkSettings(ctx, clusterdContext, ns, netSelector) assert.NoError(t, err) assert.ElementsMatch(t, cephNetwork, expectedNetworks, fmt.Sprintf("networks: %+v", cephNetwork)) diff --git a/pkg/operator/ceph/controller.go b/pkg/operator/ceph/controller.go index 9a29db1bf817..861eef85705d 100644 --- a/pkg/operator/ceph/controller.go +++ b/pkg/operator/ceph/controller.go @@ -52,7 +52,7 @@ type ReconcileConfig struct { // Add creates a new Operator configuration Controller and adds it to the Manager. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) error { - return add(mgr, newReconciler(mgr, context, opManagerContext, opConfig)) + return add(opManagerContext, mgr, newReconciler(mgr, context, opManagerContext, opConfig)) } // newReconciler returns a new reconcile.Reconciler @@ -65,7 +65,7 @@ func newReconciler(mgr manager.Manager, context *clusterd.Context, opManagerCont } } -func add(mgr manager.Manager, r reconcile.Reconciler) error { +func add(ctx context.Context, mgr manager.Manager, r reconcile.Reconciler) error { // Create a new controller c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r}) if err != nil { @@ -75,14 +75,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // Watch for ConfigMap (operator config) err = c.Watch(&source.Kind{ - Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(mgr.GetClient())) + Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient())) if err != nil { return err } // Watch for Secret (admission controller secret) err = c.Watch(&source.Kind{ - Type: &v1.Secret{TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(mgr.GetClient())) + Type: &v1.Secret{TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient())) if err != nil { return err } diff --git a/pkg/operator/ceph/csi/cluster_config.go b/pkg/operator/ceph/csi/cluster_config.go index 1c95b4bf24fe..7e665dbb8905 100644 --- a/pkg/operator/ceph/csi/cluster_config.go +++ b/pkg/operator/ceph/csi/cluster_config.go @@ -155,8 +155,7 @@ func updateCsiClusterConfig(curr, clusterKey string, newCsiClusterConfigEntry *C // CreateCsiConfigMap creates an empty config map that will be later used // to provide cluster configuration to ceph-csi. If a config map already // exists, it will return it. -func CreateCsiConfigMap(namespace string, clientset kubernetes.Interface, ownerInfo *k8sutil.OwnerInfo) error { - ctx := context.TODO() +func CreateCsiConfigMap(ctx context.Context, namespace string, clientset kubernetes.Interface, ownerInfo *k8sutil.OwnerInfo) error { configMap := &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: ConfigName, diff --git a/pkg/operator/ceph/csi/controller.go b/pkg/operator/ceph/csi/controller.go index 3d2c1517448d..13e990898c7f 100644 --- a/pkg/operator/ceph/csi/controller.go +++ b/pkg/operator/ceph/csi/controller.go @@ -184,7 +184,7 @@ func (r *ReconcileCSI) reconcile(request reconcile.Request) (reconcile.Result, e ownerInfo := k8sutil.NewOwnerInfoWithOwnerRef(ownerRef, r.opConfig.OperatorNamespace) // create an empty config map. config map will be filled with data // later when clusters have mons - err = CreateCsiConfigMap(r.opConfig.OperatorNamespace, r.context.Clientset, ownerInfo) + err = CreateCsiConfigMap(r.opManagerContext, r.opConfig.OperatorNamespace, r.context.Clientset, ownerInfo) if err != nil { return opcontroller.ImmediateRetryResult, errors.Wrap(err, "failed creating csi config map") } diff --git a/pkg/operator/ceph/disruption/clusterdisruption/osd.go b/pkg/operator/ceph/disruption/clusterdisruption/osd.go index 2fb2dd0f5af5..1359bf8d880c 100644 --- a/pkg/operator/ceph/disruption/clusterdisruption/osd.go +++ b/pkg/operator/ceph/disruption/clusterdisruption/osd.go @@ -54,7 +54,7 @@ const ( ) func (r *ReconcileClusterDisruption) createPDB(pdb client.Object) error { - err := r.client.Create(context.TODO(), pdb) + err := r.client.Create(r.context.OpManagerContext, pdb) if err != nil && !apierrors.IsAlreadyExists(err) { return errors.Wrapf(err, "failed to create pdb %q", pdb.GetName()) } @@ -62,7 +62,7 @@ func (r *ReconcileClusterDisruption) createPDB(pdb client.Object) error { } func (r *ReconcileClusterDisruption) deletePDB(pdb client.Object) error { - err := r.client.Delete(context.TODO(), pdb) + err := r.client.Delete(r.context.OpManagerContext, pdb) if err != nil && !apierrors.IsNotFound(err) { return errors.Wrapf(err, "failed to delete pdb %q", pdb.GetName()) } @@ -102,7 +102,7 @@ func (r *ReconcileClusterDisruption) createDefaultPDBforOSD(namespace string) er return errors.Wrapf(err, "failed to set owner reference to pdb %v", pdb) } - err = r.client.Get(context.TODO(), pdbRequest, &policyv1beta1.PodDisruptionBudget{}) + err = r.client.Get(r.context.OpManagerContext, pdbRequest, &policyv1beta1.PodDisruptionBudget{}) if err != nil { if apierrors.IsNotFound(err) { logger.Info("all PGs are active+clean. Restoring default OSD pdb settings") @@ -126,7 +126,7 @@ func (r *ReconcileClusterDisruption) createDefaultPDBforOSD(namespace string) er return errors.Wrapf(err, "failed to set owner reference to pdb %v", pdb) } - err = r.client.Get(context.TODO(), pdbRequest, &policyv1.PodDisruptionBudget{}) + err = r.client.Get(r.context.OpManagerContext, pdbRequest, &policyv1.PodDisruptionBudget{}) if err != nil { if apierrors.IsNotFound(err) { logger.Info("all PGs are active+clean. Restoring default OSD pdb settings") @@ -152,7 +152,7 @@ func (r *ReconcileClusterDisruption) deleteDefaultPDBforOSD(namespace string) er pdb := &policyv1beta1.PodDisruptionBudget{ ObjectMeta: objectMeta, } - err := r.client.Get(context.TODO(), pdbRequest, &policyv1beta1.PodDisruptionBudget{}) + err := r.client.Get(r.context.OpManagerContext, pdbRequest, &policyv1beta1.PodDisruptionBudget{}) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -165,7 +165,7 @@ func (r *ReconcileClusterDisruption) deleteDefaultPDBforOSD(namespace string) er pdb := &policyv1.PodDisruptionBudget{ ObjectMeta: objectMeta, } - err = r.client.Get(context.TODO(), pdbRequest, &policyv1.PodDisruptionBudget{}) + err = r.client.Get(r.context.OpManagerContext, pdbRequest, &policyv1.PodDisruptionBudget{}) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -210,7 +210,7 @@ func (r *ReconcileClusterDisruption) createBlockingPDBForOSD(namespace, failureD if err != nil { return errors.Wrapf(err, "failed to set owner reference to pdb %v", pdb) } - err = r.client.Get(context.TODO(), pdbRequest, &policyv1beta1.PodDisruptionBudget{}) + err = r.client.Get(r.context.OpManagerContext, pdbRequest, &policyv1beta1.PodDisruptionBudget{}) if err != nil { if apierrors.IsNotFound(err) { logger.Infof("creating temporary blocking pdb %q with maxUnavailable=0 for %q failure domain %q", pdbName, failureDomainType, failureDomainName) @@ -232,7 +232,7 @@ func (r *ReconcileClusterDisruption) createBlockingPDBForOSD(namespace, failureD if err != nil { return errors.Wrapf(err, "failed to set owner reference to pdb %v", pdb) } - err = r.client.Get(context.TODO(), pdbRequest, &policyv1.PodDisruptionBudget{}) + err = r.client.Get(r.context.OpManagerContext, pdbRequest, &policyv1.PodDisruptionBudget{}) if err != nil { if apierrors.IsNotFound(err) { logger.Infof("creating temporary blocking pdb %q with maxUnavailable=0 for %q failure domain %q", pdbName, failureDomainType, failureDomainName) @@ -258,7 +258,7 @@ func (r *ReconcileClusterDisruption) deleteBlockingPDBForOSD(namespace, failureD pdb := &policyv1beta1.PodDisruptionBudget{ ObjectMeta: objectMeta, } - err := r.client.Get(context.TODO(), pdbRequest, &policyv1beta1.PodDisruptionBudget{}) + err := r.client.Get(r.context.OpManagerContext, pdbRequest, &policyv1beta1.PodDisruptionBudget{}) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -271,7 +271,7 @@ func (r *ReconcileClusterDisruption) deleteBlockingPDBForOSD(namespace, failureD pdb := &policyv1.PodDisruptionBudget{ ObjectMeta: objectMeta, } - err = r.client.Get(context.TODO(), pdbRequest, &policyv1.PodDisruptionBudget{}) + err = r.client.Get(r.context.OpManagerContext, pdbRequest, &policyv1.PodDisruptionBudget{}) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -293,12 +293,12 @@ func (r *ReconcileClusterDisruption) initializePDBState(request reconcile.Reques Name: pdbStateMapName, Namespace: request.Namespace, } - err := r.client.Get(context.TODO(), pdbStateMapRequest, pdbStateMap) + err := r.client.Get(r.context.OpManagerContext, pdbStateMapRequest, pdbStateMap) if apierrors.IsNotFound(err) { // create configmap to track the draining failure domain pdbStateMap.Data = map[string]string{drainingFailureDomainKey: "", setNoOut: ""} - err := r.client.Create(context.TODO(), pdbStateMap) + err := r.client.Create(r.context.OpManagerContext, pdbStateMap) if err != nil { return pdbStateMap, errors.Wrapf(err, "failed to create the PDB state map %q", pdbStateMapRequest) } @@ -398,7 +398,7 @@ func (r *ReconcileClusterDisruption) reconcilePDBsForOSDs( pdbStateMap.Data[setNoOut] = "" } - err = r.client.Update(context.TODO(), pdbStateMap) + err = r.client.Update(clusterInfo.Context, pdbStateMap) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed to update configMap %q in cluster %q", pdbStateMapName, request) } @@ -520,7 +520,7 @@ func (r *ReconcileClusterDisruption) getOSDFailureDomains(clusterInfo *cephclien osdDeploymentList := &appsv1.DeploymentList{} namespaceListOpts := client.InNamespace(request.Namespace) topologyLocationLabel := fmt.Sprintf(osd.TopologyLocationLabel, poolFailureDomain) - err := r.client.List(context.TODO(), osdDeploymentList, client.MatchingLabels{k8sutil.AppAttr: osd.AppName}, namespaceListOpts) + err := r.client.List(clusterInfo.Context, osdDeploymentList, client.MatchingLabels{k8sutil.AppAttr: osd.AppName}, namespaceListOpts) if err != nil { return nil, nil, nil, errors.Wrap(err, "failed to list osd deployments") } @@ -542,7 +542,7 @@ func (r *ReconcileClusterDisruption) getOSDFailureDomains(clusterInfo *cephclien if !osdDownFailureDomains.Has(failureDomainName) { osdDownFailureDomains.Insert(failureDomainName) } - isDrained, err := hasOSDNodeDrained(r.client, request.Namespace, labels[osd.OsdIdLabelKey]) + isDrained, err := hasOSDNodeDrained(clusterInfo.Context, r.client, request.Namespace, labels[osd.OsdIdLabelKey]) if err != nil { return nil, nil, nil, errors.Wrapf(err, "failed to check if osd %q node is drained", deployment.Name) } @@ -591,8 +591,8 @@ func (r *ReconcileClusterDisruption) hasPGHealthCheckTimedout(pdbStateMap *corev } // hasNodeDrained returns true if OSD pod is not assigned to any node or if the OSD node is not schedulable -func hasOSDNodeDrained(c client.Client, namespace, osdID string) (bool, error) { - osdNodeName, err := getOSDNodeName(c, namespace, osdID) +func hasOSDNodeDrained(ctx context.Context, c client.Client, namespace, osdID string) (bool, error) { + osdNodeName, err := getOSDNodeName(ctx, c, namespace, osdID) if err != nil { return false, errors.Wrapf(err, "failed to get node name assigned to OSD %q POD", osdID) } @@ -602,21 +602,21 @@ func hasOSDNodeDrained(c client.Client, namespace, osdID string) (bool, error) { return true, nil } - node, err := getNode(c, osdNodeName) + node, err := getNode(ctx, c, osdNodeName) if err != nil { return false, errors.Wrapf(err, "failed to get node assigned to OSD %q POD", osdID) } return node.Spec.Unschedulable, nil } -func getOSDNodeName(c client.Client, namespace, osdID string) (string, error) { +func getOSDNodeName(ctx context.Context, c client.Client, namespace, osdID string) (string, error) { pods := &corev1.PodList{} listOpts := []client.ListOption{ client.InNamespace(namespace), client.MatchingLabels{osd.OsdIdLabelKey: osdID}, } - err := c.List(context.TODO(), pods, listOpts...) + err := c.List(ctx, pods, listOpts...) if err != nil { return "", errors.Wrapf(err, "failed to list pods for osd %q", osdID) } @@ -627,9 +627,9 @@ func getOSDNodeName(c client.Client, namespace, osdID string) (string, error) { return "", nil } -func getNode(c client.Client, nodeName string) (*corev1.Node, error) { +func getNode(ctx context.Context, c client.Client, nodeName string) (*corev1.Node, error) { node := &corev1.Node{} - err := c.Get(context.TODO(), types.NamespacedName{Name: nodeName}, node) + err := c.Get(ctx, types.NamespacedName{Name: nodeName}, node) if err != nil { return nil, errors.Wrapf(err, "failed to get node %q", nodeName) } @@ -663,7 +663,7 @@ func (r *ReconcileClusterDisruption) getAllowedDisruptions(pdbName, namespace st } if usePDBV1Beta1 { pdb := &policyv1beta1.PodDisruptionBudget{} - err = r.client.Get(context.TODO(), types.NamespacedName{Name: pdbName, Namespace: namespace}, pdb) + err = r.client.Get(r.context.OpManagerContext, types.NamespacedName{Name: pdbName, Namespace: namespace}, pdb) if err != nil { return -1, err } @@ -672,7 +672,7 @@ func (r *ReconcileClusterDisruption) getAllowedDisruptions(pdbName, namespace st } pdb := &policyv1.PodDisruptionBudget{} - err = r.client.Get(context.TODO(), types.NamespacedName{Name: pdbName, Namespace: namespace}, pdb) + err = r.client.Get(r.context.OpManagerContext, types.NamespacedName{Name: pdbName, Namespace: namespace}, pdb) if err != nil { return -1, err } diff --git a/pkg/operator/ceph/disruption/clusterdisruption/osd_test.go b/pkg/operator/ceph/disruption/clusterdisruption/osd_test.go index 36d2a2564a64..f973ebf0b4a5 100644 --- a/pkg/operator/ceph/disruption/clusterdisruption/osd_test.go +++ b/pkg/operator/ceph/disruption/clusterdisruption/osd_test.go @@ -448,15 +448,16 @@ func TestPGHealthcheckTimeout(t *testing.T) { func TestHasNodeDrained(t *testing.T) { osdPOD := fakeOSDPod(0, nodeName) + ctx := context.TODO() // Not expecting node drain because OSD pod is assigned to a schedulable node r := getFakeReconciler(t, nodeObj, osdPOD.DeepCopy(), &corev1.ConfigMap{}) - expected, err := hasOSDNodeDrained(r.client, namespace, "0") + expected, err := hasOSDNodeDrained(ctx, r.client, namespace, "0") assert.NoError(t, err) assert.False(t, expected) // Expecting node drain because OSD pod is assigned to an unschedulable node r = getFakeReconciler(t, unschedulableNodeObj, osdPOD.DeepCopy(), &corev1.ConfigMap{}) - expected, err = hasOSDNodeDrained(r.client, namespace, "0") + expected, err = hasOSDNodeDrained(ctx, r.client, namespace, "0") assert.NoError(t, err) assert.True(t, expected) @@ -464,7 +465,7 @@ func TestHasNodeDrained(t *testing.T) { osdPodObj := osdPOD.DeepCopy() osdPodObj.Spec.NodeName = "" r = getFakeReconciler(t, nodeObj, osdPodObj, &corev1.ConfigMap{}) - expected, err = hasOSDNodeDrained(r.client, namespace, "0") + expected, err = hasOSDNodeDrained(ctx, r.client, namespace, "0") assert.NoError(t, err) assert.True(t, expected) } diff --git a/pkg/operator/ceph/disruption/clusterdisruption/pools.go b/pkg/operator/ceph/disruption/clusterdisruption/pools.go index 431580590307..3126702abe2b 100644 --- a/pkg/operator/ceph/disruption/clusterdisruption/pools.go +++ b/pkg/operator/ceph/disruption/clusterdisruption/pools.go @@ -17,7 +17,6 @@ limitations under the License. package clusterdisruption import ( - "context" "fmt" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,7 +39,7 @@ func (r *ReconcileClusterDisruption) processPools(request reconcile.Request) (*c poolSpecs := make([]cephv1.PoolSpec, 0) poolCount := 0 cephBlockPoolList := &cephv1.CephBlockPoolList{} - err := r.client.List(context.TODO(), cephBlockPoolList, namespaceListOpt) + err := r.client.List(r.context.OpManagerContext, cephBlockPoolList, namespaceListOpt) if err != nil { return nil, nil, "", poolCount, errors.Wrapf(err, "could not list the CephBlockpools %v", request.NamespacedName) } @@ -50,7 +49,7 @@ func (r *ReconcileClusterDisruption) processPools(request reconcile.Request) (*c } cephFilesystemList := &cephv1.CephFilesystemList{} - err = r.client.List(context.TODO(), cephFilesystemList, namespaceListOpt) + err = r.client.List(r.context.OpManagerContext, cephFilesystemList, namespaceListOpt) if err != nil { return nil, nil, "", poolCount, errors.Wrapf(err, "could not list the CephFilesystems %v", request.NamespacedName) } @@ -64,7 +63,7 @@ func (r *ReconcileClusterDisruption) processPools(request reconcile.Request) (*c } cephObjectStoreList := &cephv1.CephObjectStoreList{} - err = r.client.List(context.TODO(), cephObjectStoreList, namespaceListOpt) + err = r.client.List(r.context.OpManagerContext, cephObjectStoreList, namespaceListOpt) if err != nil { return nil, nil, "", poolCount, errors.Wrapf(err, "could not list the CephObjectStores %v", request.NamespacedName) } diff --git a/pkg/operator/ceph/disruption/clusterdisruption/reconcile.go b/pkg/operator/ceph/disruption/clusterdisruption/reconcile.go index cf3194558233..bda740810020 100644 --- a/pkg/operator/ceph/disruption/clusterdisruption/reconcile.go +++ b/pkg/operator/ceph/disruption/clusterdisruption/reconcile.go @@ -86,7 +86,7 @@ func (r *ReconcileClusterDisruption) reconcile(request reconcile.Request) (recon // get the ceph cluster cephClusters := &cephv1.CephClusterList{} - if err := r.client.List(context.TODO(), cephClusters, client.InNamespace(request.Namespace)); err != nil { + if err := r.client.List(r.context.OpManagerContext, cephClusters, client.InNamespace(request.Namespace)); err != nil { return reconcile.Result{}, errors.Wrapf(err, "could not get cephclusters in namespace %q", request.Namespace) } if len(cephClusters.Items) == 0 { @@ -244,7 +244,7 @@ func (c *ClusterMap) GetClusterNamespaces() []string { } func (r *ReconcileClusterDisruption) deleteDrainCanaryPods(namespace string) error { - err := r.client.DeleteAllOf(context.TODO(), &appsv1.Deployment{}, client.InNamespace(namespace), + err := r.client.DeleteAllOf(r.context.OpManagerContext, &appsv1.Deployment{}, client.InNamespace(namespace), client.MatchingLabels{k8sutil.AppAttr: legacyDrainCanaryLabel}) if err != nil && !kerrors.IsNotFound(err) { return errors.Wrapf(err, "failed to delete all the legacy drain-canary pods with label %q", legacyDrainCanaryLabel) diff --git a/pkg/operator/ceph/disruption/clusterdisruption/static_pdb.go b/pkg/operator/ceph/disruption/clusterdisruption/static_pdb.go index 68642faf86a7..4134ff599806 100644 --- a/pkg/operator/ceph/disruption/clusterdisruption/static_pdb.go +++ b/pkg/operator/ceph/disruption/clusterdisruption/static_pdb.go @@ -17,8 +17,6 @@ limitations under the License. package clusterdisruption import ( - "context" - "github.com/pkg/errors" "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,7 +28,7 @@ import ( ) func (r *ReconcileClusterDisruption) createStaticPDB(pdb client.Object) error { - err := r.client.Create(context.TODO(), pdb) + err := r.client.Create(r.context.OpManagerContext, pdb) if err != nil { return errors.Wrapf(err, "failed to create pdb %q", pdb.GetName()) } @@ -48,7 +46,7 @@ func (r *ReconcileClusterDisruption) reconcileStaticPDB(request types.Namespaced } else { existingPDB = &policyv1.PodDisruptionBudget{} } - err = r.client.Get(context.TODO(), request, existingPDB) + err = r.client.Get(r.context.OpManagerContext, request, existingPDB) if err != nil { if apierrors.IsNotFound(err) { return r.createStaticPDB(pdb) diff --git a/pkg/operator/ceph/disruption/machinedisruption/reconcile.go b/pkg/operator/ceph/disruption/machinedisruption/reconcile.go index cd22d320eaf4..515b282a17fb 100644 --- a/pkg/operator/ceph/disruption/machinedisruption/reconcile.go +++ b/pkg/operator/ceph/disruption/machinedisruption/reconcile.go @@ -72,7 +72,7 @@ func (r *MachineDisruptionReconciler) reconcile(request reconcile.Request) (reco // Fetching the cephCluster cephClusterInstance := &cephv1.CephCluster{} - err := r.client.Get(context.TODO(), request.NamespacedName, cephClusterInstance) + err := r.client.Get(r.context.OpManagerContext, request.NamespacedName, cephClusterInstance) if kerrors.IsNotFound(err) { logger.Infof("cephCluster instance not found for %s", request.NamespacedName) return reconcile.Result{}, nil @@ -93,7 +93,7 @@ func (r *MachineDisruptionReconciler) reconcile(request reconcile.Request) (reco }, } - err = r.client.Get(context.TODO(), types.NamespacedName{Name: mdb.GetName(), Namespace: mdb.GetNamespace()}, mdb) + err = r.client.Get(r.context.OpManagerContext, types.NamespacedName{Name: mdb.GetName(), Namespace: mdb.GetNamespace()}, mdb) if kerrors.IsNotFound(err) { // If the MDB is not found creating the MDB for the cephCluster maxUnavailable := int32(0) @@ -121,7 +121,7 @@ func (r *MachineDisruptionReconciler) reconcile(request reconcile.Request) (reco if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed to set owner reference of mdb %q", newMDB.Name) } - err = r.client.Create(context.TODO(), newMDB) + err = r.client.Create(r.context.OpManagerContext, newMDB) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed to create mdb %s", mdb.GetName()) } @@ -134,12 +134,12 @@ func (r *MachineDisruptionReconciler) reconcile(request reconcile.Request) (reco mdb.Spec.MaxUnavailable = &maxUnavailable } // Check if the cluster is clean or not - clusterInfo := cephClient.AdminClusterInfo(request.NamespacedName.Namespace, request.NamespacedName.Name) + clusterInfo := cephClient.AdminClusterInfo(r.context.OpManagerContext, request.NamespacedName.Namespace, request.NamespacedName.Name) _, isClean, err := cephClient.IsClusterClean(r.context.ClusterdContext, clusterInfo) if err != nil { maxUnavailable := int32(0) mdb.Spec.MaxUnavailable = &maxUnavailable - err = r.client.Update(context.TODO(), mdb) + err = r.client.Update(r.context.OpManagerContext, mdb) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed to update mdb %s", mdb.GetName()) } @@ -148,14 +148,14 @@ func (r *MachineDisruptionReconciler) reconcile(request reconcile.Request) (reco if isClean && *mdb.Spec.MaxUnavailable != 1 { maxUnavailable := int32(1) mdb.Spec.MaxUnavailable = &maxUnavailable - err = r.client.Update(context.TODO(), mdb) + err = r.client.Update(r.context.OpManagerContext, mdb) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed to update mdb %s", mdb.GetName()) } } else if !isClean && *mdb.Spec.MaxUnavailable != 0 { maxUnavailable := int32(0) mdb.Spec.MaxUnavailable = &maxUnavailable - err = r.client.Update(context.TODO(), mdb) + err = r.client.Update(r.context.OpManagerContext, mdb) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed to update mdb %s", mdb.GetName()) } diff --git a/pkg/operator/ceph/disruption/machinelabel/reconcile.go b/pkg/operator/ceph/disruption/machinelabel/reconcile.go index ca4d6e581f18..ff90d61de46e 100644 --- a/pkg/operator/ceph/disruption/machinelabel/reconcile.go +++ b/pkg/operator/ceph/disruption/machinelabel/reconcile.go @@ -70,7 +70,7 @@ func (r *ReconcileMachineLabel) reconcile(request reconcile.Request) (reconcile. // Fetch list of osd pods for the requested ceph cluster pods := &corev1.PodList{} - err := r.client.List(context.TODO(), pods, client.InNamespace(request.Namespace), + err := r.client.List(r.options.OpManagerContext, pods, client.InNamespace(request.Namespace), client.MatchingLabels{k8sutil.AppAttr: osd.AppName, k8sutil.ClusterAttr: request.Name}) if err != nil { return reconcile.Result{}, err @@ -78,7 +78,7 @@ func (r *ReconcileMachineLabel) reconcile(request reconcile.Request) (reconcile. // Fetching the cephCluster cephClusterInstance := &cephv1.CephCluster{} - err = r.client.Get(context.TODO(), request.NamespacedName, cephClusterInstance) + err = r.client.Get(r.options.OpManagerContext, request.NamespacedName, cephClusterInstance) if kerrors.IsNotFound(err) { logger.Infof("cephCluster instance not found for %s", request.NamespacedName) return reconcile.Result{}, nil @@ -94,7 +94,7 @@ func (r *ReconcileMachineLabel) reconcile(request reconcile.Request) (reconcile. // Fetch list of machines available machines := &mapiv1.MachineList{} - err = r.client.List(context.TODO(), machines, client.InNamespace(cephClusterInstance.Spec.DisruptionManagement.MachineDisruptionBudgetNamespace)) + err = r.client.List(r.options.OpManagerContext, machines, client.InNamespace(cephClusterInstance.Spec.DisruptionManagement.MachineDisruptionBudgetNamespace)) if err != nil { return reconcile.Result{}, errors.Wrap(err, "failed tp fetch machine list") } @@ -128,7 +128,7 @@ func (r *ReconcileMachineLabel) reconcile(request reconcile.Request) (reconcile. labels[MachineFencingLabelKey] = request.Name labels[MachineFencingNamespaceLabelKey] = request.Namespace machine.RawMachine.SetLabels(labels) - err = r.client.Update(context.TODO(), &machine.RawMachine) + err = r.client.Update(r.options.OpManagerContext, &machine.RawMachine) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed to update machine %s", machine.RawMachine.GetName()) } @@ -140,7 +140,7 @@ func (r *ReconcileMachineLabel) reconcile(request reconcile.Request) (reconcile. labels[MachineFencingLabelKey] = "" labels[MachineFencingNamespaceLabelKey] = "" machine.RawMachine.SetLabels(labels) - err = r.client.Update(context.TODO(), &machine.RawMachine) + err = r.client.Update(r.options.OpManagerContext, &machine.RawMachine) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed to update machine %s", machine.RawMachine.GetName()) } diff --git a/pkg/operator/ceph/file/controller.go b/pkg/operator/ceph/file/controller.go index 9dd90d6ffc89..6ab072db5ced 100644 --- a/pkg/operator/ceph/file/controller.go +++ b/pkg/operator/ceph/file/controller.go @@ -253,10 +253,10 @@ func (r *ReconcileCephFilesystem) reconcile(request reconcile.Request) (reconcil return reconcile.Result{}, *cephFilesystem, err } if !deps.Empty() { - err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephFilesystem, deps) + err := reporting.ReportDeletionBlockedDueToDependents(r.opManagerContext, logger, r.client, cephFilesystem, deps) return opcontroller.WaitForRequeueIfFinalizerBlocked, *cephFilesystem, err } - reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephFilesystem) + reporting.ReportDeletionNotBlockedDueToDependents(r.opManagerContext, logger, r.client, r.recorder, cephFilesystem) runningCephVersion, err := cephclient.LeastUptodateDaemonVersion(r.context, clusterInfo, config.MonType) if err != nil { diff --git a/pkg/operator/ceph/file/subvolumegroup/controller_test.go b/pkg/operator/ceph/file/subvolumegroup/controller_test.go index fca3bff14f5a..7320389f7e44 100644 --- a/pkg/operator/ceph/file/subvolumegroup/controller_test.go +++ b/pkg/operator/ceph/file/subvolumegroup/controller_test.go @@ -217,7 +217,7 @@ func TestCephClientController(t *testing.T) { // Create CSI config map ownerRef := &metav1.OwnerReference{} ownerInfo := k8sutil.NewOwnerInfoWithOwnerRef(ownerRef, "") - err = csi.CreateCsiConfigMap(namespace, c.Clientset, ownerInfo) + err = csi.CreateCsiConfigMap(ctx, namespace, c.Clientset, ownerInfo) assert.NoError(t, err) res, err := r.Reconcile(ctx, req) @@ -264,7 +264,7 @@ func TestCephClientController(t *testing.T) { // Create CSI config map ownerRef := &metav1.OwnerReference{} ownerInfo := k8sutil.NewOwnerInfoWithOwnerRef(ownerRef, "") - err := csi.CreateCsiConfigMap(namespace, c.Clientset, ownerInfo) + err := csi.CreateCsiConfigMap(ctx, namespace, c.Clientset, ownerInfo) assert.NoError(t, err) res, err := r.Reconcile(ctx, req) diff --git a/pkg/operator/ceph/object/admin.go b/pkg/operator/ceph/object/admin.go index 7e7dc78760cd..5100ad91d8f6 100644 --- a/pkg/operator/ceph/object/admin.go +++ b/pkg/operator/ceph/object/admin.go @@ -17,7 +17,6 @@ limitations under the License. package object import ( - "context" "encoding/json" "fmt" "net/http" @@ -122,7 +121,7 @@ func NewMultisiteContext(context *clusterd.Context, clusterInfo *cephclient.Clus return nil, err } - realmName, zoneGroupName, zoneName, err := getMultisiteForObjectStore(context, &store.Spec, store.Namespace, store.Name) + realmName, zoneGroupName, zoneName, err := getMultisiteForObjectStore(clusterInfo.Context, context, &store.Spec, store.Namespace, store.Name) if err != nil { return nil, errors.Wrapf(err, "failed to get realm/zone group/zone for object store %q", nsName) } @@ -216,7 +215,7 @@ func RunAdminCommandNoMultisite(c *Context, expectJSON bool, args ...string) (st // If Multus is enabled we proxy all the command to the mgr sidecar if c.CephClusterSpec.Network.IsMultus() { - output, stderr, err = c.Context.RemoteExecutor.ExecCommandInContainerWithFullOutputWithTimeout(cephclient.ProxyAppLabel, cephclient.CommandProxyInitContainerName, c.clusterInfo.Namespace, append([]string{"radosgw-admin"}, args...)...) + output, stderr, err = c.Context.RemoteExecutor.ExecCommandInContainerWithFullOutputWithTimeout(c.clusterInfo.Context, cephclient.ProxyAppLabel, cephclient.CommandProxyInitContainerName, c.clusterInfo.Namespace, append([]string{"radosgw-admin"}, args...)...) } else { command, args := cephclient.FinalizeCephCommandArgs("radosgw-admin", c.clusterInfo, args, c.Context.ConfigDir) output, err = c.Context.Executor.ExecuteCommandWithTimeout(exec.CephCommandsTimeout, command, args...) @@ -418,7 +417,7 @@ func GetAdminOPSUserCredentials(objContext *Context, spec *cephv1.ObjectStoreSpe if spec.IsExternal() { // Fetch the secret for admin ops user s := &v1.Secret{} - err := objContext.Context.Client.Get(context.TODO(), types.NamespacedName{Name: RGWAdminOpsUserSecretName, Namespace: ns}, s) + err := objContext.Context.Client.Get(objContext.clusterInfo.Context, types.NamespacedName{Name: RGWAdminOpsUserSecretName, Namespace: ns}, s) if err != nil { return "", "", err } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index d3a9f5838d9b..170974709e4a 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -192,7 +192,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci // The CR was just created, initializing status fields if cephObjectStore.Status == nil { // The store is not available so let's not build the status Info yet - updateStatus(k8sutil.ObservedGenerationNotAvailable, r.client, request.NamespacedName, cephv1.ConditionProgressing, map[string]string{}) + updateStatus(r.opManagerContext, k8sutil.ObservedGenerationNotAvailable, r.client, request.NamespacedName, cephv1.ConditionProgressing, map[string]string{}) } // Make sure a CephCluster is present otherwise do nothing @@ -241,7 +241,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci // DELETE: the CR was deleted if !cephObjectStore.GetDeletionTimestamp().IsZero() { - updateStatus(k8sutil.ObservedGenerationNotAvailable, r.client, request.NamespacedName, cephv1.ConditionDeleting, buildStatusInfo(cephObjectStore)) + updateStatus(r.opManagerContext, k8sutil.ObservedGenerationNotAvailable, r.client, request.NamespacedName, cephv1.ConditionDeleting, buildStatusInfo(cephObjectStore)) // Detect running Ceph version runningCephVersion, err := cephclient.LeastUptodateDaemonVersion(r.context, r.clusterInfo, config.MonType) @@ -269,10 +269,10 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci return reconcile.Result{}, *cephObjectStore, err } if !deps.Empty() { - err := reporting.ReportDeletionBlockedDueToDependents(logger, r.client, cephObjectStore, deps) + err := reporting.ReportDeletionBlockedDueToDependents(r.opManagerContext, logger, r.client, cephObjectStore, deps) return opcontroller.WaitForRequeueIfFinalizerBlocked, *cephObjectStore, err } - reporting.ReportDeletionNotBlockedDueToDependents(logger, r.client, r.recorder, cephObjectStore) + reporting.ReportDeletionNotBlockedDueToDependents(r.opManagerContext, logger, r.client, r.recorder, cephObjectStore) // Cancel the context to stop monitoring the health of the object store r.stopMonitoring(cephObjectStore) @@ -351,7 +351,7 @@ func (r *ReconcileCephObjectStore) reconcile(request reconcile.Request) (reconci // update ObservedGeneration in status at the end of reconcile // Set Progressing status, we are done reconciling, the health check go routine will update the status - updateStatus(observedGeneration, r.client, request.NamespacedName, cephv1.ConditionProgressing, buildStatusInfo(cephObjectStore)) + updateStatus(r.opManagerContext, observedGeneration, r.client, request.NamespacedName, cephv1.ConditionProgressing, buildStatusInfo(cephObjectStore)) // Return and do not requeue logger.Debug("done reconciling") diff --git a/pkg/operator/ceph/object/health.go b/pkg/operator/ceph/object/health.go index 996686d24c85..ab0cae0d0ebc 100644 --- a/pkg/operator/ceph/object/health.go +++ b/pkg/operator/ceph/object/health.go @@ -92,7 +92,7 @@ func (c *bucketChecker) checkObjectStore(context context.Context) { // check the object store health immediately before starting the loop err := c.checkObjectStoreHealth() if err != nil { - updateStatusBucket(c.client, c.namespacedName, cephv1.ConditionFailure, err.Error()) + updateStatusBucket(context, c.client, c.namespacedName, cephv1.ConditionFailure, err.Error()) logger.Debugf("failed to check rgw health for object store %q. %v", c.namespacedName.Name, err) } @@ -109,7 +109,7 @@ func (c *bucketChecker) checkObjectStore(context context.Context) { logger.Debugf("checking rgw health of object store %q", c.namespacedName.Name) err := c.checkObjectStoreHealth() if err != nil { - updateStatusBucket(c.client, c.namespacedName, cephv1.ConditionFailure, err.Error()) + updateStatusBucket(context, c.client, c.namespacedName, cephv1.ConditionFailure, err.Error()) logger.Debugf("failed to check rgw health for object store %q. %v", c.namespacedName.Name, err) } } @@ -182,7 +182,7 @@ func (c *bucketChecker) checkObjectStoreHealth() error { logger.Debugf("successfully checked object store endpoint for object store %q", c.namespacedName.String()) // Update the EndpointStatus in the CR to reflect the healthyness - updateStatusBucket(c.client, c.namespacedName, cephv1.ConditionConnected, "") + updateStatusBucket(c.objContext.clusterInfo.Context, c.client, c.namespacedName, cephv1.ConditionConnected, "") return nil } diff --git a/pkg/operator/ceph/object/objectstore.go b/pkg/operator/ceph/object/objectstore.go index 5277554c73aa..e35a5926b8e6 100644 --- a/pkg/operator/ceph/object/objectstore.go +++ b/pkg/operator/ceph/object/objectstore.go @@ -193,8 +193,7 @@ func deleteSingleSiteRealmAndPools(objContext *Context, spec cephv1.ObjectStoreS } // This is used for quickly getting the name of the realm, zone group, and zone for an object-store to pass into a Context -func getMultisiteForObjectStore(clusterdContext *clusterd.Context, spec *cephv1.ObjectStoreSpec, namespace, name string) (string, string, string, error) { - ctx := context.TODO() +func getMultisiteForObjectStore(ctx context.Context, clusterdContext *clusterd.Context, spec *cephv1.ObjectStoreSpec, namespace, name string) (string, string, string, error) { if spec.IsExternal() { // Currently external cluster with zones/zonegroup/realm are not supported, will be @@ -310,8 +309,7 @@ func DecodeSecret(secret *v1.Secret, keyName string) (string, error) { return string(realmKey), nil } -func GetRealmKeySecret(clusterdContext *clusterd.Context, realmName types.NamespacedName) (*v1.Secret, error) { - ctx := context.TODO() +func GetRealmKeySecret(ctx context.Context, clusterdContext *clusterd.Context, realmName types.NamespacedName) (*v1.Secret, error) { realmSecretName := realmName.Name + "-keys" realmSecret, err := clusterdContext.Clientset.CoreV1().Secrets(realmName.Namespace).Get(ctx, realmSecretName, metav1.GetOptions{}) if err != nil { @@ -338,11 +336,11 @@ func GetRealmKeyArgsFromSecret(realmSecret *v1.Secret, realmName types.Namespace return accessKeyArg, secretKeyArg, nil } -func GetRealmKeyArgs(clusterdContext *clusterd.Context, realmName, namespace string) (string, string, error) { +func GetRealmKeyArgs(ctx context.Context, clusterdContext *clusterd.Context, realmName, namespace string) (string, string, error) { realmNsName := types.NamespacedName{Namespace: namespace, Name: realmName} logger.Debugf("getting keys for realm %q", realmNsName.String()) - secret, err := GetRealmKeySecret(clusterdContext, realmNsName) + secret, err := GetRealmKeySecret(ctx, clusterdContext, realmNsName) if err != nil { return "", "", err } @@ -508,7 +506,7 @@ func createSystemUser(objContext *Context, namespace string) error { if code, ok := exec.ExitStatus(err); ok && code == int(syscall.EINVAL) { logger.Debugf("realm system user %q not found, running `radosgw-admin user create`", uid) - accessKeyArg, secretKeyArg, err := GetRealmKeyArgs(objContext.Context, objContext.Realm, namespace) + accessKeyArg, secretKeyArg, err := GetRealmKeyArgs(objContext.clusterInfo.Context, objContext.Context, objContext.Realm, namespace) if err != nil { return errors.Wrap(err, "failed to get keys for realm") } @@ -639,7 +637,7 @@ func deletePools(ctx *Context, spec cephv1.ObjectStoreSpec, lastStore bool) erro } if configurePoolsConcurrently() { - waitGroup, _ := errgroup.WithContext(context.TODO()) + waitGroup, _ := errgroup.WithContext(ctx.clusterInfo.Context) for _, pool := range pools { name := poolName(ctx.Name, pool) waitGroup.Go(func() error { @@ -761,7 +759,7 @@ func configurePoolsConcurrently() bool { func createSimilarPools(ctx *Context, pools []string, clusterSpec *cephv1.ClusterSpec, poolSpec cephv1.PoolSpec, pgCount string) error { // We have concurrency if configurePoolsConcurrently() { - waitGroup, _ := errgroup.WithContext(context.TODO()) + waitGroup, _ := errgroup.WithContext(ctx.clusterInfo.Context) for _, pool := range pools { // Avoid the loop re-using the same value with a closure pool := pool diff --git a/pkg/operator/ceph/object/objectstore_test.go b/pkg/operator/ceph/object/objectstore_test.go index 5df22d594f22..eba72e529771 100644 --- a/pkg/operator/ceph/object/objectstore_test.go +++ b/pkg/operator/ceph/object/objectstore_test.go @@ -558,6 +558,7 @@ func Test_createMultisite(t *testing.T) { func TestGetRealmKeySecret(t *testing.T) { ns := "my-ns" realmName := "my-realm" + ctx := context.TODO() t.Run("secret exists", func(t *testing.T) { secret := &v1.Secret{ @@ -576,7 +577,7 @@ func TestGetRealmKeySecret(t *testing.T) { Clientset: k8sfake.NewSimpleClientset(secret), } - secret, err := GetRealmKeySecret(c, types.NamespacedName{Namespace: ns, Name: realmName}) + secret, err := GetRealmKeySecret(ctx, c, types.NamespacedName{Namespace: ns, Name: realmName}) assert.NoError(t, err) assert.NotNil(t, secret) }) @@ -586,7 +587,7 @@ func TestGetRealmKeySecret(t *testing.T) { Clientset: k8sfake.NewSimpleClientset(), } - secret, err := GetRealmKeySecret(c, types.NamespacedName{Namespace: ns, Name: realmName}) + secret, err := GetRealmKeySecret(ctx, c, types.NamespacedName{Namespace: ns, Name: realmName}) assert.Error(t, err) assert.Nil(t, secret) }) @@ -648,6 +649,7 @@ func TestGetRealmKeyArgsFromSecret(t *testing.T) { func TestGetRealmKeyArgs(t *testing.T) { ns := "my-ns" realmName := "my-realm" + ctx := context.TODO() baseSecret := &v1.Secret{ TypeMeta: metav1.TypeMeta{ @@ -674,7 +676,7 @@ func TestGetRealmKeyArgs(t *testing.T) { Clientset: k8sfake.NewSimpleClientset(s), } - access, secret, err := GetRealmKeyArgs(c, realmName, ns) + access, secret, err := GetRealmKeyArgs(ctx, c, realmName, ns) assert.NoError(t, err) assert.Equal(t, "--access-key=my-access-key", access) assert.Equal(t, "--secret-key=my-secret-key", secret) @@ -685,7 +687,7 @@ func TestGetRealmKeyArgs(t *testing.T) { Clientset: k8sfake.NewSimpleClientset(), } - access, secret, err := GetRealmKeyArgs(c, realmName, ns) + access, secret, err := GetRealmKeyArgs(ctx, c, realmName, ns) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to get CephObjectRealm \"my-ns/my-realm\" keys secret") assert.Equal(t, "", access) @@ -700,7 +702,7 @@ func TestGetRealmKeyArgs(t *testing.T) { Clientset: k8sfake.NewSimpleClientset(s), } - access, secret, err := GetRealmKeyArgs(c, realmName, ns) + access, secret, err := GetRealmKeyArgs(ctx, c, realmName, ns) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to decode CephObjectRealm \"my-ns/my-realm\"") assert.Equal(t, "", access) diff --git a/pkg/operator/ceph/object/realm/controller.go b/pkg/operator/ceph/object/realm/controller.go index b221cfec00b8..d14f2b2102ab 100644 --- a/pkg/operator/ceph/object/realm/controller.go +++ b/pkg/operator/ceph/object/realm/controller.go @@ -216,7 +216,7 @@ func (r *ReconcileObjectRealm) pullCephRealm(realm *cephv1.CephObjectRealm) (rec realmArg := fmt.Sprintf("--rgw-realm=%s", realm.Name) urlArg := fmt.Sprintf("--url=%s", realm.Spec.Pull.Endpoint) logger.Debug("getting keys to pull realm for CephObjectRealm %q", realm.Name) - accessKeyArg, secretKeyArg, err := object.GetRealmKeyArgs(r.context, realm.Name, realm.Namespace) + accessKeyArg, secretKeyArg, err := object.GetRealmKeyArgs(r.opManagerContext, r.context, realm.Name, realm.Namespace) if err != nil { if kerrors.IsNotFound(err) { return waitForRequeueIfRealmNotReady, err @@ -264,7 +264,7 @@ func (r *ReconcileObjectRealm) createRealmKeys(realm *cephv1.CephObjectRealm) (r secretName := realm.Name + "-keys" // Check if the secret exists first, and check that it has the access information needed. - secret, err := object.GetRealmKeySecret(r.context, realmName) + secret, err := object.GetRealmKeySecret(r.opManagerContext, r.context, realmName) if err == nil { // secret exists, now verify access info. We don't need the args, but we do want to get the // error if the args can't be built diff --git a/pkg/operator/ceph/object/status.go b/pkg/operator/ceph/object/status.go index 34a839ff93fb..ee34182c9adc 100644 --- a/pkg/operator/ceph/object/status.go +++ b/pkg/operator/ceph/object/status.go @@ -31,17 +31,17 @@ import ( ) func (r *ReconcileCephObjectStore) setFailedStatus(observedGeneration int64, name types.NamespacedName, errMessage string, err error) (reconcile.Result, error) { - updateStatus(observedGeneration, r.client, name, cephv1.ConditionFailure, map[string]string{}) + updateStatus(r.opManagerContext, observedGeneration, r.client, name, cephv1.ConditionFailure, map[string]string{}) return reconcile.Result{}, errors.Wrapf(err, "%s", errMessage) } // updateStatus updates an object with a given status -func updateStatus(observedGeneration int64, client client.Client, namespacedName types.NamespacedName, status cephv1.ConditionType, info map[string]string) { +func updateStatus(ctx context.Context, observedGeneration int64, client client.Client, namespacedName types.NamespacedName, status cephv1.ConditionType, info map[string]string) { // Updating the status is important to users, but we can still keep operating if there is a // failure. Retry a few times to give it our best effort attempt. err := retry.RetryOnConflict(retry.DefaultRetry, func() error { objectStore := &cephv1.CephObjectStore{} - if err := client.Get(context.TODO(), namespacedName, objectStore); err != nil { + if err := client.Get(ctx, namespacedName, objectStore); err != nil { if kerrors.IsNotFound(err) { logger.Debug("CephObjectStore resource not found. Ignoring since object must be deleted.") return nil @@ -76,12 +76,12 @@ func updateStatus(observedGeneration int64, client client.Client, namespacedName } // updateStatusBucket updates an object with a given status -func updateStatusBucket(client client.Client, name types.NamespacedName, status cephv1.ConditionType, details string) { +func updateStatusBucket(ctx context.Context, client client.Client, name types.NamespacedName, status cephv1.ConditionType, details string) { // Updating the status is important to users, but we can still keep operating if there is a // failure. Retry a few times to give it our best effort attempt. err := retry.RetryOnConflict(retry.DefaultRetry, func() error { objectStore := &cephv1.CephObjectStore{} - if err := client.Get(context.TODO(), name, objectStore); err != nil { + if err := client.Get(ctx, name, objectStore); err != nil { if kerrors.IsNotFound(err) { logger.Debug("CephObjectStore resource not found. Ignoring since object must be deleted.") return nil diff --git a/pkg/operator/ceph/object/topic/controller_test.go b/pkg/operator/ceph/object/topic/controller_test.go index c716b30fa37b..67cbdfa27b7f 100644 --- a/pkg/operator/ceph/object/topic/controller_test.go +++ b/pkg/operator/ceph/object/topic/controller_test.go @@ -86,7 +86,7 @@ func TestCephBucketTopicController(t *testing.T) { }, }, } - clusterInfo := cephclient.AdminClusterInfo(namespace, "rook") + clusterInfo := cephclient.AdminClusterInfo(ctx, namespace, "rook") clusterSpec := cephv1.ClusterSpec{} req := reconcile.Request{ NamespacedName: types.NamespacedName{ diff --git a/pkg/operator/ceph/object/zone/controller.go b/pkg/operator/ceph/object/zone/controller.go index e1b3fde1940f..58db4e225eca 100644 --- a/pkg/operator/ceph/object/zone/controller.go +++ b/pkg/operator/ceph/object/zone/controller.go @@ -273,7 +273,7 @@ func (r *ReconcileObjectZone) createPoolsAndZone(objContext *object.Context, zon } logger.Debugf("created pools ceph zone %q", zone.Name) - accessKeyArg, secretKeyArg, err := object.GetRealmKeyArgs(r.context, realmName, zone.Namespace) + accessKeyArg, secretKeyArg, err := object.GetRealmKeyArgs(r.opManagerContext, r.context, realmName, zone.Namespace) if err != nil { return errors.Wrap(err, "failed to get keys for realm") } diff --git a/pkg/operator/ceph/pool/controller.go b/pkg/operator/ceph/pool/controller.go index 8d70df9decc9..6861f164d0e4 100644 --- a/pkg/operator/ceph/pool/controller.go +++ b/pkg/operator/ceph/pool/controller.go @@ -173,7 +173,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // The CR was just created, initializing status fields if cephBlockPool.Status == nil { - updateStatus(r.client, request.NamespacedName, cephv1.ConditionProgressing, nil, k8sutil.ObservedGenerationNotAvailable) + updateStatus(r.opManagerContext, r.client, request.NamespacedName, cephv1.ConditionProgressing, nil, k8sutil.ObservedGenerationNotAvailable) } // Make sure a CephCluster is present otherwise do nothing @@ -281,7 +281,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile logger.Info(opcontroller.OperatorNotInitializedMessage) return opcontroller.WaitForRequeueIfOperatorNotInitialized, *cephBlockPool, nil } - updateStatus(r.client, request.NamespacedName, cephv1.ConditionFailure, nil, k8sutil.ObservedGenerationNotAvailable) + updateStatus(r.opManagerContext, r.client, request.NamespacedName, cephv1.ConditionFailure, nil, k8sutil.ObservedGenerationNotAvailable) return reconcileResponse, *cephBlockPool, errors.Wrapf(err, "failed to create pool %q.", cephBlockPool.GetName()) } @@ -298,7 +298,7 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // Always create a bootstrap peer token in case another cluster wants to add us as a peer reconcileResponse, err = opcontroller.CreateBootstrapPeerSecret(r.context, clusterInfo, cephBlockPool, k8sutil.NewOwnerInfo(cephBlockPool, r.scheme)) if err != nil { - updateStatus(r.client, request.NamespacedName, cephv1.ConditionFailure, nil, k8sutil.ObservedGenerationNotAvailable) + updateStatus(r.opManagerContext, r.client, request.NamespacedName, cephv1.ConditionFailure, nil, k8sutil.ObservedGenerationNotAvailable) return reconcileResponse, *cephBlockPool, errors.Wrapf(err, "failed to create rbd-mirror bootstrap peer for pool %q.", cephBlockPool.GetName()) } @@ -330,13 +330,13 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // update ObservedGeneration in status at the end of reconcile // Set Ready status, we are done reconciling - updateStatus(r.client, request.NamespacedName, cephv1.ConditionReady, opcontroller.GenerateStatusInfo(cephBlockPool), observedGeneration) + updateStatus(r.opManagerContext, r.client, request.NamespacedName, cephv1.ConditionReady, opcontroller.GenerateStatusInfo(cephBlockPool), observedGeneration) // If not mirrored there is no Status Info field to fulfil } else { // update ObservedGeneration in status at the end of reconcile // Set Ready status, we are done reconciling - updateStatus(r.client, request.NamespacedName, cephv1.ConditionReady, nil, observedGeneration) + updateStatus(r.opManagerContext, r.client, request.NamespacedName, cephv1.ConditionReady, nil, observedGeneration) // Stop monitoring the mirroring status of this pool if blockPoolContextsExists && r.blockPoolContexts[blockPoolChannelKey].started { @@ -429,7 +429,7 @@ func configureRBDStats(clusterContext *clusterd.Context, clusterInfo *cephclient namespaceListOpt := client.InNamespace(clusterInfo.Namespace) cephBlockPoolList := &cephv1.CephBlockPoolList{} var enableStatsForPools []string - err := clusterContext.Client.List(context.TODO(), cephBlockPoolList, namespaceListOpt) + err := clusterContext.Client.List(clusterInfo.Context, cephBlockPoolList, namespaceListOpt) if err != nil { return errors.Wrap(err, "failed to retrieve list of CephBlockPool") } diff --git a/pkg/operator/ceph/pool/status.go b/pkg/operator/ceph/pool/status.go index 8a70cfdfd58e..85aaa2fbbd66 100644 --- a/pkg/operator/ceph/pool/status.go +++ b/pkg/operator/ceph/pool/status.go @@ -30,9 +30,9 @@ import ( ) // updateStatus updates a pool CR with the given status -func updateStatus(client client.Client, poolName types.NamespacedName, status cephv1.ConditionType, info map[string]string, observedGeneration int64) { +func updateStatus(ctx context.Context, client client.Client, poolName types.NamespacedName, status cephv1.ConditionType, info map[string]string, observedGeneration int64) { pool := &cephv1.CephBlockPool{} - err := client.Get(context.TODO(), poolName, pool) + err := client.Get(ctx, poolName, pool) if err != nil { if kerrors.IsNotFound(err) { logger.Debug("CephBlockPool resource not found. Ignoring since object must be deleted.") diff --git a/pkg/operator/ceph/predicate.go b/pkg/operator/ceph/predicate.go index 2b159f4f7dea..315ab3eb01a2 100644 --- a/pkg/operator/ceph/predicate.go +++ b/pkg/operator/ceph/predicate.go @@ -30,14 +30,14 @@ import ( ) // predicateOpController is the predicate function to trigger reconcile on operator configuration cm change -func predicateController(client client.Client) predicate.Funcs { +func predicateController(ctx context.Context, client client.Client) predicate.Funcs { return predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { if cm, ok := e.Object.(*v1.ConfigMap); ok { return cm.Name == controller.OperatorSettingConfigMapName } else if s, ok := e.Object.(*v1.Secret); ok { if s.Name == admissionControllerAppName { - err := client.Get(context.TODO(), types.NamespacedName{Name: admissionControllerAppName, Namespace: e.Object.GetNamespace()}, &v1.Service{}) + err := client.Get(ctx, types.NamespacedName{Name: admissionControllerAppName, Namespace: e.Object.GetNamespace()}, &v1.Service{}) if err != nil { if kerrors.IsNotFound(err) { // If the service is present we don't need to reload again. If we don't perform diff --git a/pkg/operator/ceph/reporting/reporting.go b/pkg/operator/ceph/reporting/reporting.go index 1576c62e2f31..652d778597f6 100644 --- a/pkg/operator/ceph/reporting/reporting.go +++ b/pkg/operator/ceph/reporting/reporting.go @@ -164,7 +164,7 @@ func ReportReconcileResult( // 2. as a condition on the object (added to the object's conditions list given) // 3. as the returned error which should be included in the FailedReconcile message func ReportDeletionBlockedDueToDependents( - logger *capnslog.PackageLogger, client client.Client, obj cephv1.StatusConditionGetter, deps *dependents.DependentList, + ctx context.Context, logger *capnslog.PackageLogger, client client.Client, obj cephv1.StatusConditionGetter, deps *dependents.DependentList, ) error { kind := obj.GetObjectKind().GroupVersionKind().Kind nsName := types.NamespacedName{ @@ -179,7 +179,7 @@ func ReportDeletionBlockedDueToDependents( // 2. condition blockedCond := dependents.DeletionBlockedDueToDependentsCondition(true, blockedMsg) err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if err := client.Get(context.TODO(), nsName, obj); err != nil { + if err := client.Get(ctx, nsName, obj); err != nil { return errors.Wrapf(err, "failed to get latest %s %q", kind, nsName.String()) } if err := UpdateStatusCondition(client, obj, blockedCond); err != nil { @@ -201,7 +201,7 @@ func ReportDeletionBlockedDueToDependents( // 2. as an event on the object (via the given event recorder) // 3. as a condition on the object (added to the object's conditions list given) func ReportDeletionNotBlockedDueToDependents( - logger *capnslog.PackageLogger, client client.Client, recorder record.EventRecorder, obj cephv1.StatusConditionGetter, + ctx context.Context, logger *capnslog.PackageLogger, client client.Client, recorder record.EventRecorder, obj cephv1.StatusConditionGetter, ) { kind := obj.GetObjectKind().GroupVersionKind().Kind nsName := types.NamespacedName{ @@ -220,7 +220,7 @@ func ReportDeletionNotBlockedDueToDependents( // 3. condition unblockedCond := dependents.DeletionBlockedDueToDependentsCondition(false, safeMsg) err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if err := client.Get(context.TODO(), nsName, obj); err != nil { + if err := client.Get(ctx, nsName, obj); err != nil { return errors.Wrapf(err, "failed to get latest %s %q", kind, nsName.String()) } if err := UpdateStatusCondition(client, obj, unblockedCond); err != nil { diff --git a/pkg/util/exec/exec_pod.go b/pkg/util/exec/exec_pod.go index 73b4b105ec30..5d17d22391d2 100644 --- a/pkg/util/exec/exec_pod.go +++ b/pkg/util/exec/exec_pod.go @@ -92,9 +92,9 @@ func (e *RemotePodCommandExecutor) ExecWithOptions(options ExecOptions) (string, // ExecCommandInContainerWithFullOutput executes a command in the // specified container and return stdout, stderr and error -func (e *RemotePodCommandExecutor) ExecCommandInContainerWithFullOutput(appLabel, containerName, namespace string, cmd ...string) (string, string, error) { +func (e *RemotePodCommandExecutor) ExecCommandInContainerWithFullOutput(ctx context.Context, appLabel, containerName, namespace string, cmd ...string) (string, string, error) { options := metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", appLabel)} - pods, err := e.ClientSet.CoreV1().Pods(namespace).List(context.TODO(), options) + pods, err := e.ClientSet.CoreV1().Pods(namespace).List(ctx, options) if err != nil { return "", "", err } @@ -130,6 +130,6 @@ func execute(method string, url *url.URL, config *rest.Config, stdin io.Reader, }) } -func (e *RemotePodCommandExecutor) ExecCommandInContainerWithFullOutputWithTimeout(appLabel, containerName, namespace string, cmd ...string) (string, string, error) { - return e.ExecCommandInContainerWithFullOutput(appLabel, containerName, namespace, append([]string{"timeout", strconv.Itoa(int(CephCommandsTimeout.Seconds()))}, cmd...)...) +func (e *RemotePodCommandExecutor) ExecCommandInContainerWithFullOutputWithTimeout(ctx context.Context, appLabel, containerName, namespace string, cmd ...string) (string, string, error) { + return e.ExecCommandInContainerWithFullOutput(ctx, appLabel, containerName, namespace, append([]string{"timeout", strconv.Itoa(int(CephCommandsTimeout.Seconds()))}, cmd...)...) } diff --git a/tests/framework/utils/k8s_helper.go b/tests/framework/utils/k8s_helper.go index f57440c11e11..84dbf23624c7 100644 --- a/tests/framework/utils/k8s_helper.go +++ b/tests/framework/utils/k8s_helper.go @@ -222,7 +222,7 @@ func (k8sh *K8sHelper) ExecRemoteWithRetry(retries int, namespace, command strin var output, stderr string cliFinal := append([]string{command}, commandArgs...) for i := 0; i < retries; i++ { - output, stderr, err = k8sh.remoteExecutor.ExecCommandInContainerWithFullOutput("rook-ceph-tools", "rook-ceph-tools", namespace, cliFinal...) + output, stderr, err = k8sh.remoteExecutor.ExecCommandInContainerWithFullOutput(context.TODO(), "rook-ceph-tools", "rook-ceph-tools", namespace, cliFinal...) if err == nil { return output, nil }