diff --git a/pkg/operator/ceph/object/admin.go b/pkg/operator/ceph/object/admin.go index 7124265ba475..250c10f11750 100644 --- a/pkg/operator/ceph/object/admin.go +++ b/pkg/operator/ceph/object/admin.go @@ -150,7 +150,7 @@ func NewMultisiteAdminOpsContext( return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user") } - httpClient, tlsCert, err := GenObjectStoreHTTPClient(objContext, spec) + httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec) if err != nil { return nil, err } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index 24e48095b66d..85e3370f06e0 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -448,7 +448,10 @@ func (r *ReconcileCephObjectStore) reconcileCreateObjectStore(cephObjectStore *c // Start monitoring if !cephObjectStore.Spec.HealthCheck.Bucket.Disabled { - r.startMonitoring(cephObjectStore, objContext, namespacedName) + err = r.startMonitoring(cephObjectStore, objContext, namespacedName) + if err != nil { + return reconcile.Result{}, err + } } return reconcile.Result{}, nil @@ -513,20 +516,23 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1 return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil } -func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) { +func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error { // Start monitoring object store if r.objectStoreContexts[objectstore.Name].started { logger.Info("external rgw endpoint monitoring go routine already running!") - return + return nil } rgwChecker, err := newBucketChecker(r.context, objContext, r.client, namespacedName, &objectstore.Spec) if err != nil { - logger.Error(err) - return + return errors.Wrapf(err, "failed to start rgw health checker for CephObjectStore %q, will re-reconcile", namespacedName.String()) } - logger.Info("starting rgw healthcheck") + logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String()) go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx) + + // Set the monitoring flag so we don't start more than one go routine r.objectStoreContexts[objectstore.Name].started = true + + return nil } diff --git a/pkg/operator/ceph/object/controller_test.go b/pkg/operator/ceph/object/controller_test.go index 6c182cae860c..8bc9282c39c1 100644 --- a/pkg/operator/ceph/object/controller_test.go +++ b/pkg/operator/ceph/object/controller_test.go @@ -19,11 +19,13 @@ package object import ( "context" + "net/http" "os" "testing" "time" "github.com/coreos/pkg/capnslog" + "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" "github.com/rook/rook/pkg/client/clientset/versioned/scheme" @@ -283,52 +285,60 @@ func TestCephObjectStoreController(t *testing.T) { capnslog.SetGlobalLogLevel(capnslog.DEBUG) os.Setenv("ROOK_LOG_LEVEL", "DEBUG") - // A Pool resource with metadata and spec. - objectStore := &cephv1.CephObjectStore{ - ObjectMeta: metav1.ObjectMeta{ - Name: store, - Namespace: namespace, - }, - Spec: cephv1.ObjectStoreSpec{}, - TypeMeta: controllerTypeMeta, - } - objectStore.Spec.Gateway.Port = 80 + setupNewEnvironment := func(additionalObjects ...runtime.Object) *ReconcileCephObjectStore { + // A Pool resource with metadata and spec. + objectStore := &cephv1.CephObjectStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: store, + Namespace: namespace, + }, + Spec: cephv1.ObjectStoreSpec{}, + TypeMeta: controllerTypeMeta, + } + objectStore.Spec.Gateway.Port = 80 - // Objects to track in the fake client. - object := []runtime.Object{ - objectStore, - } + // Objects to track in the fake client. + objects := []runtime.Object{ + objectStore, + } - executor := &exectest.MockExecutor{ - MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - if args[0] == "status" { - return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil - } - return "", nil - }, - } - clientset := test.New(t, 3) - c := &clusterd.Context{ - Executor: executor, - RookClientset: rookclient.NewSimpleClientset(), - Clientset: clientset, - } + for i := range additionalObjects { + objects = append(objects, additionalObjects[i]) + } - // Register operator types with the runtime scheme. - s := scheme.Scheme - s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{}) - s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}) + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "status" { + return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil + } + return "", nil + }, + } + clientset := test.New(t, 3) + c := &clusterd.Context{ + Executor: executor, + RookClientset: rookclient.NewSimpleClientset(), + Clientset: clientset, + } - // Create a fake client to mock API calls. - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r := &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreContexts: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), - opManagerContext: context.TODO(), + // Register operator types with the runtime scheme. + s := scheme.Scheme + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{}) + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}) + + // Create a fake client to mock API calls. + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + // Create a ReconcileCephObjectStore object with the scheme and fake client. + r := &ReconcileCephObjectStore{ + client: cl, + scheme: s, + context: c, + objectStoreContexts: make(map[string]*objectStoreHealth), + recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), + opManagerContext: context.TODO(), + } + + return r } // Mock request to simulate Reconcile() being called on an event for a @@ -340,48 +350,56 @@ func TestCephObjectStoreController(t *testing.T) { }, } - cephCluster := &cephv1.CephCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespace, - Namespace: namespace, - }, - Status: cephv1.ClusterStatus{ - Phase: "", - CephStatus: &cephv1.CephStatus{ - Health: "", - }, - }, - } - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) { return &cephver.Pacific, &cephver.Pacific, nil } t.Run("error - no ceph cluster", func(t *testing.T) { + r := setupNewEnvironment() + res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) }) t.Run("error - ceph cluster not ready", func(t *testing.T) { - object = append(object, cephCluster) - // Create a fake client to mock API calls. - cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r = &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreContexts: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), - opManagerContext: context.TODO(), + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Namespace: namespace, + }, + Status: cephv1.ClusterStatus{ + Phase: "", + CephStatus: &cephv1.CephStatus{ + Health: "", + }, + }, } + + r := setupNewEnvironment(cephCluster) + res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) }) - t.Run("success - object store is running", func(t *testing.T) { + // set up an environment that has a ready ceph cluster, and return the reconciler for it + setupEnvironmentWithReadyCephCluster := func() *ReconcileCephObjectStore { + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Namespace: namespace, + }, + Status: cephv1.ClusterStatus{ + Phase: k8sutil.ReadyStatus, + CephStatus: &cephv1.CephStatus{ + Health: "HEALTH_OK", + }, + }, + } + + r := setupNewEnvironment(cephCluster) + secrets := map[string][]byte{ "fsid": []byte(name), "mon-secret": []byte("monsecret"), @@ -395,18 +413,11 @@ func TestCephObjectStoreController(t *testing.T) { Data: secrets, Type: k8sutil.RookType, } - _, err := c.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) + _, err := r.context.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) assert.NoError(t, err) - // Add ready status to the CephCluster - cephCluster.Status.Phase = k8sutil.ReadyStatus - cephCluster.Status.CephStatus.Health = "HEALTH_OK" - - // Create a fake client to mock API calls. - cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Override executor with the new ceph status and more content - executor = &exectest.MockExecutor{ + executor := &exectest.MockExecutor{ MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { if args[0] == "status" { return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_OK"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil @@ -452,21 +463,36 @@ func TestCephObjectStoreController(t *testing.T) { return "", nil }, } - c.Executor = executor + r.context.Executor = executor - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r = &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreContexts: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), - opManagerContext: context.TODO(), + return r + } + + t.Run("error - failed to start health checker", func(t *testing.T) { + r := setupEnvironmentWithReadyCephCluster() + + // cause a failure when creating the admin ops api for the health check + origHTTPClientFunc := genObjectStoreHTTPClientFunc + genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) { + return nil, []byte{}, errors.New("induced error creating admin ops API connection") } + defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }() + + _, err := r.Reconcile(ctx, req) + assert.Error(t, err) + // we don't actually care if Requeue is true if there is an error assert.True(t, res.Requeue) + assert.Contains(t, err.Error(), "failed to start rgw health checker") + assert.Contains(t, err.Error(), "induced error creating admin ops API connection") + }) + + t.Run("success - object store is running", func(t *testing.T) { + r := setupEnvironmentWithReadyCephCluster() res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.False(t, res.Requeue) + + objectStore := &cephv1.CephObjectStore{} err = r.client.Get(context.TODO(), req.NamespacedName, objectStore) assert.NoError(t, err) assert.Equal(t, cephv1.ConditionProgressing, objectStore.Status.Phase, objectStore) diff --git a/pkg/operator/ceph/object/rgw.go b/pkg/operator/ceph/object/rgw.go index 058c5cdb4c71..3c9c0248823e 100644 --- a/pkg/operator/ceph/object/rgw.go +++ b/pkg/operator/ceph/object/rgw.go @@ -348,7 +348,10 @@ func GetTlsCaCert(objContext *Context, objectStoreSpec *cephv1.ObjectStoreSpec) return tlsCert, nil } -func GenObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { +// Allow overriding this function for unit tests to mock the admin ops api +var genObjectStoreHTTPClientFunc = genObjectStoreHTTPClient + +func genObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name) c := &http.Client{} tlsCert := []byte{}