diff --git a/pkg/operator/ceph/object/admin.go b/pkg/operator/ceph/object/admin.go index 7124265ba475..250c10f11750 100644 --- a/pkg/operator/ceph/object/admin.go +++ b/pkg/operator/ceph/object/admin.go @@ -150,7 +150,7 @@ func NewMultisiteAdminOpsContext( return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user") } - httpClient, tlsCert, err := GenObjectStoreHTTPClient(objContext, spec) + httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec) if err != nil { return nil, err } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index f4c8306aa3f4..34369fea12f5 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -442,7 +442,10 @@ func (r *ReconcileCephObjectStore) reconcileCreateObjectStore(cephObjectStore *c // Start monitoring if !cephObjectStore.Spec.HealthCheck.Bucket.Disabled { - r.startMonitoring(cephObjectStore, objContext, namespacedName) + err = r.startMonitoring(cephObjectStore, objContext, namespacedName) + if err != nil { + return reconcile.Result{}, err + } } return reconcile.Result{}, nil @@ -507,22 +510,23 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1 return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil } -func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) { +func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error { // Start monitoring object store if r.objectStoreChannels[objectstore.Name].monitoringRunning { - logger.Debug("external rgw endpoint monitoring go routine already running!") - return + logger.Info("external rgw endpoint monitoring go routine already running!") + return nil } rgwChecker, err := newBucketChecker(r.context, objContext, r.client, namespacedName, &objectstore.Spec) if err != nil { - logger.Error(err) - return + return errors.Wrapf(err, "failed to start rgw health checker for CephObjectStore %q, will re-reconcile", namespacedName.String()) } - logger.Info("starting rgw healthcheck") + logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String()) go rgwChecker.checkObjectStore(r.objectStoreChannels[objectstore.Name].stopChan) // Set the monitoring flag so we don't start more than one go routine r.objectStoreChannels[objectstore.Name].monitoringRunning = true + + return nil } diff --git a/pkg/operator/ceph/object/controller_test.go b/pkg/operator/ceph/object/controller_test.go index 25a0e3a86e0a..9a59a9244dc8 100644 --- a/pkg/operator/ceph/object/controller_test.go +++ b/pkg/operator/ceph/object/controller_test.go @@ -19,11 +19,13 @@ package object import ( "context" + "net/http" "os" "testing" "time" "github.com/coreos/pkg/capnslog" + "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" "github.com/rook/rook/pkg/client/clientset/versioned/scheme" @@ -281,59 +283,59 @@ func TestCephObjectStoreController(t *testing.T) { capnslog.SetGlobalLogLevel(capnslog.DEBUG) os.Setenv("ROOK_LOG_LEVEL", "DEBUG") - // - // TEST 1 SETUP - // - // FAILURE because no CephCluster - // - // A Pool resource with metadata and spec. - objectStore := &cephv1.CephObjectStore{ - ObjectMeta: metav1.ObjectMeta{ - Name: store, - Namespace: namespace, - }, - Spec: cephv1.ObjectStoreSpec{}, - TypeMeta: controllerTypeMeta, - } - objectStore.Spec.Gateway.Port = 80 + setupNewEnvironment := func(additionalObjects ...runtime.Object) *ReconcileCephObjectStore { + // A Pool resource with metadata and spec. + objectStore := &cephv1.CephObjectStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: store, + Namespace: namespace, + }, + Spec: cephv1.ObjectStoreSpec{}, + TypeMeta: controllerTypeMeta, + } + objectStore.Spec.Gateway.Port = 80 - // Objects to track in the fake client. - object := []runtime.Object{ - objectStore, - } + // Objects to track in the fake client. + objects := []runtime.Object{ + objectStore, + } - executor := &exectest.MockExecutor{ - MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - if args[0] == "status" { - return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil - } - if args[0] == "versions" { - return dummyVersionsRaw, nil - } - return "", nil - }, - } - clientset := test.New(t, 3) - c := &clusterd.Context{ - Executor: executor, - RookClientset: rookclient.NewSimpleClientset(), - Clientset: clientset, - } + for i := range additionalObjects { + objects = append(objects, additionalObjects[i]) + } - // Register operator types with the runtime scheme. - s := scheme.Scheme - s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{}) - s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}) + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "status" { + return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil + } + return "", nil + }, + } + clientset := test.New(t, 3) + c := &clusterd.Context{ + Executor: executor, + RookClientset: rookclient.NewSimpleClientset(), + Clientset: clientset, + } - // Create a fake client to mock API calls. - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r := &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreChannels: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), + // Register operator types with the runtime scheme. + s := scheme.Scheme + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{}) + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}) + + // Create a fake client to mock API calls. + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + // Create a ReconcileCephObjectStore object with the scheme and fake client. + r := &ReconcileCephObjectStore{ + client: cl, + scheme: s, + context: c, + objectStoreChannels: make(map[string]*objectStoreHealth), + recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), + } + + return r } // Mock request to simulate Reconcile() being called on an event for a @@ -344,91 +346,84 @@ func TestCephObjectStoreController(t *testing.T) { Namespace: namespace, }, } - logger.Info("STARTING PHASE 1") - res, err := r.Reconcile(ctx, req) - assert.NoError(t, err) - assert.True(t, res.Requeue) - logger.Info("PHASE 1 DONE") - - // - // TEST 2: - // - // FAILURE we have a cluster but it's not ready - // - cephCluster := &cephv1.CephCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespace, - Namespace: namespace, - }, - Status: cephv1.ClusterStatus{ - Phase: "", - CephStatus: &cephv1.CephStatus{ - Health: "", + + t.Run("error - no ceph cluster", func(t *testing.T) { + r := setupNewEnvironment() + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.True(t, res.Requeue) + }) + + t.Run("error - ceph cluster not ready", func(t *testing.T) { + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Namespace: namespace, }, - }, - } - object = append(object, cephCluster) - // Create a fake client to mock API calls. - cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r = &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreChannels: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), - } - logger.Info("STARTING PHASE 2") - res, err = r.Reconcile(ctx, req) - assert.NoError(t, err) - assert.True(t, res.Requeue) - logger.Info("PHASE 2 DONE") + Status: cephv1.ClusterStatus{ + Phase: "", + CephStatus: &cephv1.CephStatus{ + Health: "", + }, + }, + } - // - // TEST 3: - // - // SUCCESS! The CephCluster is ready - // + r := setupNewEnvironment(cephCluster) - // Mock clusterInfo - secrets := map[string][]byte{ - "fsid": []byte(name), - "mon-secret": []byte("monsecret"), - "admin-secret": []byte("adminsecret"), - } - secret := &v1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rook-ceph-mon", - Namespace: namespace, - }, - Data: secrets, - Type: k8sutil.RookType, - } - _, err = c.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) - assert.NoError(t, err) + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.True(t, res.Requeue) + }) - // Add ready status to the CephCluster - cephCluster.Status.Phase = k8sutil.ReadyStatus - cephCluster.Status.CephStatus.Health = "HEALTH_OK" + // set up an environment that has a ready ceph cluster, and return the reconciler for it + setupEnvironmentWithReadyCephCluster := func() *ReconcileCephObjectStore { + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Namespace: namespace, + }, + Status: cephv1.ClusterStatus{ + Phase: k8sutil.ReadyStatus, + CephStatus: &cephv1.CephStatus{ + Health: "HEALTH_OK", + }, + }, + } - // Create a fake client to mock API calls. - cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() + r := setupNewEnvironment(cephCluster) - // Override executor with the new ceph status and more content - executor = &exectest.MockExecutor{ - MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - if args[0] == "status" { - return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_OK"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil - } - if args[0] == "auth" && args[1] == "get-or-create-key" { - return rgwCephAuthGetOrCreateKey, nil - } - if args[0] == "versions" { - return dummyVersionsRaw, nil - } - if args[0] == "osd" && args[1] == "lspools" { - // ceph actually outputs this all on one line, but this parses the same - return `[ + secrets := map[string][]byte{ + "fsid": []byte(name), + "mon-secret": []byte("monsecret"), + "admin-secret": []byte("adminsecret"), + } + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rook-ceph-mon", + Namespace: namespace, + }, + Data: secrets, + Type: k8sutil.RookType, + } + _, err := r.context.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) + assert.NoError(t, err) + + // Override executor with the new ceph status and more content + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "status" { + return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_OK"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil + } + if args[0] == "auth" && args[1] == "get-or-create-key" { + return rgwCephAuthGetOrCreateKey, nil + } + if args[0] == "versions" { + return dummyVersionsRaw, nil + } + if args[0] == "osd" && args[1] == "lspools" { + // ceph actually outputs this all on one line, but this parses the same + return `[ {"poolnum":1,"poolname":"replicapool"}, {"poolnum":2,"poolname":"device_health_metrics"}, {"poolnum":3,"poolname":".rgw.root"}, @@ -439,49 +434,64 @@ func TestCephObjectStoreController(t *testing.T) { {"poolnum":8,"poolname":"my-store.rgw.meta"}, {"poolnum":9,"poolname":"my-store.rgw.buckets.data"} ]`, nil - } - return "", nil - }, - MockExecuteCommandWithTimeout: func(timeout time.Duration, command string, args ...string) (string, error) { - if args[0] == "realm" && args[1] == "list" { - return realmListJSON, nil - } - if args[0] == "realm" && args[1] == "get" { - return realmGetJSON, nil - } - if args[0] == "zonegroup" && args[1] == "get" { - return zoneGroupGetJSON, nil - } - if args[0] == "zone" && args[1] == "get" { - return zoneGetJSON, nil - } - if args[0] == "user" { - return userCreateJSON, nil - } - return "", nil - }, - } - c.Executor = executor + } + return "", nil + }, + MockExecuteCommandWithTimeout: func(timeout time.Duration, command string, args ...string) (string, error) { + if args[0] == "realm" && args[1] == "list" { + return realmListJSON, nil + } + if args[0] == "realm" && args[1] == "get" { + return realmGetJSON, nil + } + if args[0] == "zonegroup" && args[1] == "get" { + return zoneGroupGetJSON, nil + } + if args[0] == "zone" && args[1] == "get" { + return zoneGetJSON, nil + } + if args[0] == "user" { + return userCreateJSON, nil + } + return "", nil + }, + } + r.context.Executor = executor - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r = &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreChannels: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), + return r } - logger.Info("STARTING PHASE 3") - res, err = r.Reconcile(ctx, req) - assert.NoError(t, err) - assert.False(t, res.Requeue) - err = r.client.Get(context.TODO(), req.NamespacedName, objectStore) - assert.NoError(t, err) - assert.Equal(t, cephv1.ConditionProgressing, objectStore.Status.Phase, objectStore) - assert.NotEmpty(t, objectStore.Status.Info["endpoint"], objectStore) - assert.Equal(t, "http://rook-ceph-rgw-my-store.rook-ceph.svc:80", objectStore.Status.Info["endpoint"], objectStore) - logger.Info("PHASE 3 DONE") + t.Run("error - failed to start health checker", func(t *testing.T) { + r := setupEnvironmentWithReadyCephCluster() + + // cause a failure when creating the admin ops api for the health check + origHTTPClientFunc := genObjectStoreHTTPClientFunc + genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) { + return nil, []byte{}, errors.New("induced error creating admin ops API connection") + } + defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }() + + _, err := r.Reconcile(ctx, req) + assert.Error(t, err) + // we don't actually care if Requeue is true if there is an error assert.True(t, res.Requeue) + assert.Contains(t, err.Error(), "failed to start rgw health checker") + assert.Contains(t, err.Error(), "induced error creating admin ops API connection") + }) + + t.Run("success - object store is running", func(t *testing.T) { + r := setupEnvironmentWithReadyCephCluster() + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + + objectStore := &cephv1.CephObjectStore{} + err = r.client.Get(context.TODO(), req.NamespacedName, objectStore) + assert.NoError(t, err) + assert.Equal(t, cephv1.ConditionProgressing, objectStore.Status.Phase, objectStore) + assert.NotEmpty(t, objectStore.Status.Info["endpoint"], objectStore) + assert.Equal(t, "http://rook-ceph-rgw-my-store.rook-ceph.svc:80", objectStore.Status.Info["endpoint"], objectStore) + }) } func TestCephObjectStoreControllerMultisite(t *testing.T) { diff --git a/pkg/operator/ceph/object/rgw.go b/pkg/operator/ceph/object/rgw.go index 9865cb5c8789..e922731cd886 100644 --- a/pkg/operator/ceph/object/rgw.go +++ b/pkg/operator/ceph/object/rgw.go @@ -349,7 +349,10 @@ func GetTlsCaCert(objContext *Context, objectStoreSpec *cephv1.ObjectStoreSpec) return tlsCert, nil } -func GenObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { +// Allow overriding this function for unit tests to mock the admin ops api +var genObjectStoreHTTPClientFunc = genObjectStoreHTTPClient + +func genObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name) c := &http.Client{} tlsCert := []byte{}