From 2de041839c985f3cca8ffab4bfa7a6087fe4f948 Mon Sep 17 00:00:00 2001 From: Blaine Gardner Date: Mon, 13 Sep 2021 17:38:28 -0600 Subject: [PATCH] ceph: retry object health check if creation fails If the CephObjectStore health checker fails to be created, return a reconcile failure so that the reconcile will be run again and Rook will retry creating the health checker. This also means that Rook will not list the CephObjectStore as ready if the health checker can't be started. Resolved backport conflicts in the below files: pkg/operator/ceph/object/controller.go - revert monitoring routine struct change from 1.7 to master pkg/operator/ceph/object/controller_test.go - use master branch's rearchitected test harness Signed-off-by: Blaine Gardner (cherry picked from commit 5383ba2df2c159619bcb925880c2c7ff01fb28ca) --- pkg/operator/ceph/object/admin.go | 2 +- pkg/operator/ceph/object/controller.go | 18 +- pkg/operator/ceph/object/controller_test.go | 346 ++++++++++---------- pkg/operator/ceph/object/rgw.go | 5 +- 4 files changed, 194 insertions(+), 177 deletions(-) diff --git a/pkg/operator/ceph/object/admin.go b/pkg/operator/ceph/object/admin.go index 7124265ba475..250c10f11750 100644 --- a/pkg/operator/ceph/object/admin.go +++ b/pkg/operator/ceph/object/admin.go @@ -150,7 +150,7 @@ func NewMultisiteAdminOpsContext( return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user") } - httpClient, tlsCert, err := GenObjectStoreHTTPClient(objContext, spec) + httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec) if err != nil { return nil, err } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index f4c8306aa3f4..34369fea12f5 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -442,7 +442,10 @@ func (r *ReconcileCephObjectStore) reconcileCreateObjectStore(cephObjectStore *c // Start monitoring if !cephObjectStore.Spec.HealthCheck.Bucket.Disabled { - r.startMonitoring(cephObjectStore, objContext, namespacedName) + err = r.startMonitoring(cephObjectStore, objContext, namespacedName) + if err != nil { + return reconcile.Result{}, err + } } return reconcile.Result{}, nil @@ -507,22 +510,23 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1 return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil } -func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) { +func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error { // Start monitoring object store if r.objectStoreChannels[objectstore.Name].monitoringRunning { - logger.Debug("external rgw endpoint monitoring go routine already running!") - return + logger.Info("external rgw endpoint monitoring go routine already running!") + return nil } rgwChecker, err := newBucketChecker(r.context, objContext, r.client, namespacedName, &objectstore.Spec) if err != nil { - logger.Error(err) - return + return errors.Wrapf(err, "failed to start rgw health checker for CephObjectStore %q, will re-reconcile", namespacedName.String()) } - logger.Info("starting rgw healthcheck") + logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String()) go rgwChecker.checkObjectStore(r.objectStoreChannels[objectstore.Name].stopChan) // Set the monitoring flag so we don't start more than one go routine r.objectStoreChannels[objectstore.Name].monitoringRunning = true + + return nil } diff --git a/pkg/operator/ceph/object/controller_test.go b/pkg/operator/ceph/object/controller_test.go index 25a0e3a86e0a..9a59a9244dc8 100644 --- a/pkg/operator/ceph/object/controller_test.go +++ b/pkg/operator/ceph/object/controller_test.go @@ -19,11 +19,13 @@ package object import ( "context" + "net/http" "os" "testing" "time" "github.com/coreos/pkg/capnslog" + "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" "github.com/rook/rook/pkg/client/clientset/versioned/scheme" @@ -281,59 +283,59 @@ func TestCephObjectStoreController(t *testing.T) { capnslog.SetGlobalLogLevel(capnslog.DEBUG) os.Setenv("ROOK_LOG_LEVEL", "DEBUG") - // - // TEST 1 SETUP - // - // FAILURE because no CephCluster - // - // A Pool resource with metadata and spec. - objectStore := &cephv1.CephObjectStore{ - ObjectMeta: metav1.ObjectMeta{ - Name: store, - Namespace: namespace, - }, - Spec: cephv1.ObjectStoreSpec{}, - TypeMeta: controllerTypeMeta, - } - objectStore.Spec.Gateway.Port = 80 + setupNewEnvironment := func(additionalObjects ...runtime.Object) *ReconcileCephObjectStore { + // A Pool resource with metadata and spec. + objectStore := &cephv1.CephObjectStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: store, + Namespace: namespace, + }, + Spec: cephv1.ObjectStoreSpec{}, + TypeMeta: controllerTypeMeta, + } + objectStore.Spec.Gateway.Port = 80 - // Objects to track in the fake client. - object := []runtime.Object{ - objectStore, - } + // Objects to track in the fake client. + objects := []runtime.Object{ + objectStore, + } - executor := &exectest.MockExecutor{ - MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - if args[0] == "status" { - return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil - } - if args[0] == "versions" { - return dummyVersionsRaw, nil - } - return "", nil - }, - } - clientset := test.New(t, 3) - c := &clusterd.Context{ - Executor: executor, - RookClientset: rookclient.NewSimpleClientset(), - Clientset: clientset, - } + for i := range additionalObjects { + objects = append(objects, additionalObjects[i]) + } - // Register operator types with the runtime scheme. - s := scheme.Scheme - s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{}) - s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}) + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "status" { + return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil + } + return "", nil + }, + } + clientset := test.New(t, 3) + c := &clusterd.Context{ + Executor: executor, + RookClientset: rookclient.NewSimpleClientset(), + Clientset: clientset, + } - // Create a fake client to mock API calls. - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r := &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreChannels: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), + // Register operator types with the runtime scheme. + s := scheme.Scheme + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{}) + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}) + + // Create a fake client to mock API calls. + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + // Create a ReconcileCephObjectStore object with the scheme and fake client. + r := &ReconcileCephObjectStore{ + client: cl, + scheme: s, + context: c, + objectStoreChannels: make(map[string]*objectStoreHealth), + recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), + } + + return r } // Mock request to simulate Reconcile() being called on an event for a @@ -344,91 +346,84 @@ func TestCephObjectStoreController(t *testing.T) { Namespace: namespace, }, } - logger.Info("STARTING PHASE 1") - res, err := r.Reconcile(ctx, req) - assert.NoError(t, err) - assert.True(t, res.Requeue) - logger.Info("PHASE 1 DONE") - - // - // TEST 2: - // - // FAILURE we have a cluster but it's not ready - // - cephCluster := &cephv1.CephCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespace, - Namespace: namespace, - }, - Status: cephv1.ClusterStatus{ - Phase: "", - CephStatus: &cephv1.CephStatus{ - Health: "", + + t.Run("error - no ceph cluster", func(t *testing.T) { + r := setupNewEnvironment() + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.True(t, res.Requeue) + }) + + t.Run("error - ceph cluster not ready", func(t *testing.T) { + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Namespace: namespace, }, - }, - } - object = append(object, cephCluster) - // Create a fake client to mock API calls. - cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r = &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreChannels: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), - } - logger.Info("STARTING PHASE 2") - res, err = r.Reconcile(ctx, req) - assert.NoError(t, err) - assert.True(t, res.Requeue) - logger.Info("PHASE 2 DONE") + Status: cephv1.ClusterStatus{ + Phase: "", + CephStatus: &cephv1.CephStatus{ + Health: "", + }, + }, + } - // - // TEST 3: - // - // SUCCESS! The CephCluster is ready - // + r := setupNewEnvironment(cephCluster) - // Mock clusterInfo - secrets := map[string][]byte{ - "fsid": []byte(name), - "mon-secret": []byte("monsecret"), - "admin-secret": []byte("adminsecret"), - } - secret := &v1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rook-ceph-mon", - Namespace: namespace, - }, - Data: secrets, - Type: k8sutil.RookType, - } - _, err = c.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) - assert.NoError(t, err) + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.True(t, res.Requeue) + }) - // Add ready status to the CephCluster - cephCluster.Status.Phase = k8sutil.ReadyStatus - cephCluster.Status.CephStatus.Health = "HEALTH_OK" + // set up an environment that has a ready ceph cluster, and return the reconciler for it + setupEnvironmentWithReadyCephCluster := func() *ReconcileCephObjectStore { + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Namespace: namespace, + }, + Status: cephv1.ClusterStatus{ + Phase: k8sutil.ReadyStatus, + CephStatus: &cephv1.CephStatus{ + Health: "HEALTH_OK", + }, + }, + } - // Create a fake client to mock API calls. - cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() + r := setupNewEnvironment(cephCluster) - // Override executor with the new ceph status and more content - executor = &exectest.MockExecutor{ - MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - if args[0] == "status" { - return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_OK"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil - } - if args[0] == "auth" && args[1] == "get-or-create-key" { - return rgwCephAuthGetOrCreateKey, nil - } - if args[0] == "versions" { - return dummyVersionsRaw, nil - } - if args[0] == "osd" && args[1] == "lspools" { - // ceph actually outputs this all on one line, but this parses the same - return `[ + secrets := map[string][]byte{ + "fsid": []byte(name), + "mon-secret": []byte("monsecret"), + "admin-secret": []byte("adminsecret"), + } + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rook-ceph-mon", + Namespace: namespace, + }, + Data: secrets, + Type: k8sutil.RookType, + } + _, err := r.context.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) + assert.NoError(t, err) + + // Override executor with the new ceph status and more content + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "status" { + return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_OK"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil + } + if args[0] == "auth" && args[1] == "get-or-create-key" { + return rgwCephAuthGetOrCreateKey, nil + } + if args[0] == "versions" { + return dummyVersionsRaw, nil + } + if args[0] == "osd" && args[1] == "lspools" { + // ceph actually outputs this all on one line, but this parses the same + return `[ {"poolnum":1,"poolname":"replicapool"}, {"poolnum":2,"poolname":"device_health_metrics"}, {"poolnum":3,"poolname":".rgw.root"}, @@ -439,49 +434,64 @@ func TestCephObjectStoreController(t *testing.T) { {"poolnum":8,"poolname":"my-store.rgw.meta"}, {"poolnum":9,"poolname":"my-store.rgw.buckets.data"} ]`, nil - } - return "", nil - }, - MockExecuteCommandWithTimeout: func(timeout time.Duration, command string, args ...string) (string, error) { - if args[0] == "realm" && args[1] == "list" { - return realmListJSON, nil - } - if args[0] == "realm" && args[1] == "get" { - return realmGetJSON, nil - } - if args[0] == "zonegroup" && args[1] == "get" { - return zoneGroupGetJSON, nil - } - if args[0] == "zone" && args[1] == "get" { - return zoneGetJSON, nil - } - if args[0] == "user" { - return userCreateJSON, nil - } - return "", nil - }, - } - c.Executor = executor + } + return "", nil + }, + MockExecuteCommandWithTimeout: func(timeout time.Duration, command string, args ...string) (string, error) { + if args[0] == "realm" && args[1] == "list" { + return realmListJSON, nil + } + if args[0] == "realm" && args[1] == "get" { + return realmGetJSON, nil + } + if args[0] == "zonegroup" && args[1] == "get" { + return zoneGroupGetJSON, nil + } + if args[0] == "zone" && args[1] == "get" { + return zoneGetJSON, nil + } + if args[0] == "user" { + return userCreateJSON, nil + } + return "", nil + }, + } + r.context.Executor = executor - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r = &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreChannels: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), + return r } - logger.Info("STARTING PHASE 3") - res, err = r.Reconcile(ctx, req) - assert.NoError(t, err) - assert.False(t, res.Requeue) - err = r.client.Get(context.TODO(), req.NamespacedName, objectStore) - assert.NoError(t, err) - assert.Equal(t, cephv1.ConditionProgressing, objectStore.Status.Phase, objectStore) - assert.NotEmpty(t, objectStore.Status.Info["endpoint"], objectStore) - assert.Equal(t, "http://rook-ceph-rgw-my-store.rook-ceph.svc:80", objectStore.Status.Info["endpoint"], objectStore) - logger.Info("PHASE 3 DONE") + t.Run("error - failed to start health checker", func(t *testing.T) { + r := setupEnvironmentWithReadyCephCluster() + + // cause a failure when creating the admin ops api for the health check + origHTTPClientFunc := genObjectStoreHTTPClientFunc + genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) { + return nil, []byte{}, errors.New("induced error creating admin ops API connection") + } + defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }() + + _, err := r.Reconcile(ctx, req) + assert.Error(t, err) + // we don't actually care if Requeue is true if there is an error assert.True(t, res.Requeue) + assert.Contains(t, err.Error(), "failed to start rgw health checker") + assert.Contains(t, err.Error(), "induced error creating admin ops API connection") + }) + + t.Run("success - object store is running", func(t *testing.T) { + r := setupEnvironmentWithReadyCephCluster() + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + + objectStore := &cephv1.CephObjectStore{} + err = r.client.Get(context.TODO(), req.NamespacedName, objectStore) + assert.NoError(t, err) + assert.Equal(t, cephv1.ConditionProgressing, objectStore.Status.Phase, objectStore) + assert.NotEmpty(t, objectStore.Status.Info["endpoint"], objectStore) + assert.Equal(t, "http://rook-ceph-rgw-my-store.rook-ceph.svc:80", objectStore.Status.Info["endpoint"], objectStore) + }) } func TestCephObjectStoreControllerMultisite(t *testing.T) { diff --git a/pkg/operator/ceph/object/rgw.go b/pkg/operator/ceph/object/rgw.go index 9865cb5c8789..e922731cd886 100644 --- a/pkg/operator/ceph/object/rgw.go +++ b/pkg/operator/ceph/object/rgw.go @@ -349,7 +349,10 @@ func GetTlsCaCert(objContext *Context, objectStoreSpec *cephv1.ObjectStoreSpec) return tlsCert, nil } -func GenObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { +// Allow overriding this function for unit tests to mock the admin ops api +var genObjectStoreHTTPClientFunc = genObjectStoreHTTPClient + +func genObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name) c := &http.Client{} tlsCert := []byte{}