diff --git a/pkg/operator/ceph/object/admin.go b/pkg/operator/ceph/object/admin.go index 7124265ba475a..250c10f117506 100644 --- a/pkg/operator/ceph/object/admin.go +++ b/pkg/operator/ceph/object/admin.go @@ -150,7 +150,7 @@ func NewMultisiteAdminOpsContext( return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user") } - httpClient, tlsCert, err := GenObjectStoreHTTPClient(objContext, spec) + httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec) if err != nil { return nil, err } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index 24e48095b66df..85e3370f06e0d 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -448,7 +448,10 @@ func (r *ReconcileCephObjectStore) reconcileCreateObjectStore(cephObjectStore *c // Start monitoring if !cephObjectStore.Spec.HealthCheck.Bucket.Disabled { - r.startMonitoring(cephObjectStore, objContext, namespacedName) + err = r.startMonitoring(cephObjectStore, objContext, namespacedName) + if err != nil { + return reconcile.Result{}, err + } } return reconcile.Result{}, nil @@ -513,20 +516,23 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1 return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil } -func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) { +func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error { // Start monitoring object store if r.objectStoreContexts[objectstore.Name].started { logger.Info("external rgw endpoint monitoring go routine already running!") - return + return nil } rgwChecker, err := newBucketChecker(r.context, objContext, r.client, namespacedName, &objectstore.Spec) if err != nil { - logger.Error(err) - return + return errors.Wrapf(err, "failed to start rgw health checker for CephObjectStore %q, will re-reconcile", namespacedName.String()) } - logger.Info("starting rgw healthcheck") + logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String()) go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx) + + // Set the monitoring flag so we don't start more than one go routine r.objectStoreContexts[objectstore.Name].started = true + + return nil } diff --git a/pkg/operator/ceph/object/controller_test.go b/pkg/operator/ceph/object/controller_test.go index 6c182cae860cc..ab3ae9309e828 100644 --- a/pkg/operator/ceph/object/controller_test.go +++ b/pkg/operator/ceph/object/controller_test.go @@ -19,11 +19,13 @@ package object import ( "context" + "net/http" "os" "testing" "time" "github.com/coreos/pkg/capnslog" + "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" "github.com/rook/rook/pkg/client/clientset/versioned/scheme" @@ -283,52 +285,60 @@ func TestCephObjectStoreController(t *testing.T) { capnslog.SetGlobalLogLevel(capnslog.DEBUG) os.Setenv("ROOK_LOG_LEVEL", "DEBUG") - // A Pool resource with metadata and spec. - objectStore := &cephv1.CephObjectStore{ - ObjectMeta: metav1.ObjectMeta{ - Name: store, - Namespace: namespace, - }, - Spec: cephv1.ObjectStoreSpec{}, - TypeMeta: controllerTypeMeta, - } - objectStore.Spec.Gateway.Port = 80 + setupNewEnvironment := func(additionalObjects ...runtime.Object) *ReconcileCephObjectStore { + // A Pool resource with metadata and spec. + objectStore := &cephv1.CephObjectStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: store, + Namespace: namespace, + }, + Spec: cephv1.ObjectStoreSpec{}, + TypeMeta: controllerTypeMeta, + } + objectStore.Spec.Gateway.Port = 80 - // Objects to track in the fake client. - object := []runtime.Object{ - objectStore, - } + // Objects to track in the fake client. + objects := []runtime.Object{ + objectStore, + } - executor := &exectest.MockExecutor{ - MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - if args[0] == "status" { - return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil - } - return "", nil - }, - } - clientset := test.New(t, 3) - c := &clusterd.Context{ - Executor: executor, - RookClientset: rookclient.NewSimpleClientset(), - Clientset: clientset, - } + for i := range additionalObjects { + objects = append(objects, additionalObjects[i]) + } - // Register operator types with the runtime scheme. - s := scheme.Scheme - s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{}) - s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}) + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "status" { + return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil + } + return "", nil + }, + } + clientset := test.New(t, 3) + c := &clusterd.Context{ + Executor: executor, + RookClientset: rookclient.NewSimpleClientset(), + Clientset: clientset, + } - // Create a fake client to mock API calls. - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r := &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreContexts: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), - opManagerContext: context.TODO(), + // Register operator types with the runtime scheme. + s := scheme.Scheme + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{}) + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}) + + // Create a fake client to mock API calls. + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + // Create a ReconcileCephObjectStore object with the scheme and fake client. + r := &ReconcileCephObjectStore{ + client: cl, + scheme: s, + context: c, + objectStoreContexts: make(map[string]*objectStoreHealth), + recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), + opManagerContext: context.TODO(), + } + + return r } // Mock request to simulate Reconcile() being called on an event for a @@ -340,48 +350,56 @@ func TestCephObjectStoreController(t *testing.T) { }, } - cephCluster := &cephv1.CephCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespace, - Namespace: namespace, - }, - Status: cephv1.ClusterStatus{ - Phase: "", - CephStatus: &cephv1.CephStatus{ - Health: "", - }, - }, - } - currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) { return &cephver.Pacific, &cephver.Pacific, nil } t.Run("error - no ceph cluster", func(t *testing.T) { + r := setupNewEnvironment() + res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) }) t.Run("error - ceph cluster not ready", func(t *testing.T) { - object = append(object, cephCluster) - // Create a fake client to mock API calls. - cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r = &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreContexts: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), - opManagerContext: context.TODO(), + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Namespace: namespace, + }, + Status: cephv1.ClusterStatus{ + Phase: "", + CephStatus: &cephv1.CephStatus{ + Health: "", + }, + }, } + + r := setupNewEnvironment(cephCluster) + res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) }) - t.Run("success - object store is running", func(t *testing.T) { + // set up an environment that has a ready ceph cluster, and return the reconciler for it + setupEnvironmentWithReadyCephCluster := func() *ReconcileCephObjectStore { + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + Namespace: namespace, + }, + Status: cephv1.ClusterStatus{ + Phase: k8sutil.ReadyStatus, + CephStatus: &cephv1.CephStatus{ + Health: "HEALTH_OK", + }, + }, + } + + r := setupNewEnvironment(cephCluster) + secrets := map[string][]byte{ "fsid": []byte(name), "mon-secret": []byte("monsecret"), @@ -395,18 +413,11 @@ func TestCephObjectStoreController(t *testing.T) { Data: secrets, Type: k8sutil.RookType, } - _, err := c.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) + _, err := r.context.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) assert.NoError(t, err) - // Add ready status to the CephCluster - cephCluster.Status.Phase = k8sutil.ReadyStatus - cephCluster.Status.CephStatus.Health = "HEALTH_OK" - - // Create a fake client to mock API calls. - cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() - // Override executor with the new ceph status and more content - executor = &exectest.MockExecutor{ + executor := &exectest.MockExecutor{ MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { if args[0] == "status" { return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_OK"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil @@ -452,21 +463,34 @@ func TestCephObjectStoreController(t *testing.T) { return "", nil }, } - c.Executor = executor + r.context.Executor = executor - // Create a ReconcileCephObjectStore object with the scheme and fake client. - r = &ReconcileCephObjectStore{ - client: cl, - scheme: s, - context: c, - objectStoreContexts: make(map[string]*objectStoreHealth), - recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)), - opManagerContext: context.TODO(), + return r + } + + t.Run("error - failed to start health checker", func(t *testing.T) { + r := setupEnvironmentWithReadyCephCluster() + + // cause a failure when creating the admin ops api for the health check + origHTTPClientFunc := genObjectStoreHTTPClientFunc + genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) { + return nil, []byte{}, errors.New("induced error creating admin ops API connection") } + defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }() + + _, err := r.Reconcile(ctx, req) + assert.Error(t, err) + // we don't actually care if Requeue is true if there is an error assert.True(t, res.Requeue) + }) + + t.Run("success - object store is running", func(t *testing.T) { + r := setupEnvironmentWithReadyCephCluster() res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.False(t, res.Requeue) + + objectStore := &cephv1.CephObjectStore{} err = r.client.Get(context.TODO(), req.NamespacedName, objectStore) assert.NoError(t, err) assert.Equal(t, cephv1.ConditionProgressing, objectStore.Status.Phase, objectStore) @@ -648,3 +672,250 @@ func TestCephObjectStoreControllerMultisite(t *testing.T) { err = r.client.Get(context.TODO(), req.NamespacedName, objectStore) assert.NoError(t, err) } + +// func TestReconcileCephObjectStore_reconcileCreateObjectStore(t *testing.T) { +// // create a Rook-Ceph scheme to use for our tests +// scheme := runtime.NewScheme() +// assert.NoError(t, cephv1.AddToScheme(scheme)) + +// storeName := "my-store" +// ns := "test-ns" + +// nsName := types.NamespacedName{Namespace: ns, Name: storeName} +// cluster := cephv1.ClusterSpec{} + +// pools := []*client.CephStoragePoolSummary{ +// {Name: "my-store.rgw.control"}, +// {Name: "my-store.rgw.meta"}, +// {Name: "my-store.rgw.log"}, +// {Name: "my-store.rgw.buckets.non-ec"}, +// {Name: "my-store.rgw.buckets.data"}, +// {Name: ".rgw.root"}, +// {Name: "my-store.rgw.buckets.index"}, +// } + +// osdGetPoolOutput := func(poolName string) string { +// return fmt.Sprintf(`{ +// "pool": "%s", +// "pool_id": 5, +// "size": 1, +// "min_size": 1, +// "pg_num": 8, +// "pgp_num": 8, +// "crush_rule": "%s", +// "hashpspool": true, +// "nodelete": false, +// "nopgchange": false, +// "nosizechange": false, +// "write_fadvise_dontneed": false, +// "noscrub": false, +// "nodeep-scrub": false, +// "use_gmt_hitset": true, +// "fast_read": 0, +// "compression_mode": "none", +// "pg_autoscale_mode": "on", +// "pg_num_min": 8 +// }`, poolName, poolName) +// } + +// rgwAdminRealmGetOutput := `{ +// "id": "29e44350-b208-4f25-a3ce-6a059ea559f2", +// "name": "my-store", +// "current_period": "0bd7f624-a22e-4779-a64d-913824d7e843", +// "epoch": 2 +// }` + +// rgwAdminZoneGroupGetOutput := `{ +// "id": "e03a68ee-f4a0-438c-826a-399ace923f2f", +// "name": "my-store", +// "api_name": "my-store", +// "is_master": "true", +// "endpoints": [ +// "http://10.111.19.44:80" +// ], +// "hostnames": [], +// "hostnames_s3website": [], +// "master_zone": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f", +// "zones": [ +// { +// "id": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f", +// "name": "my-store", +// "endpoints": [ +// "http://10.111.19.44:80" +// ], +// "log_meta": "false", +// "log_data": "false", +// "bucket_index_max_shards": 11, +// "read_only": "false", +// "tier_type": "", +// "sync_from_all": "true", +// "sync_from": [], +// "redirect_zone": "" +// } +// ], +// "placement_targets": [ +// { +// "name": "default-placement", +// "tags": [], +// "storage_classes": [ +// "STANDARD" +// ] +// } +// ], +// "default_placement": "default-placement", +// "realm_id": "29e44350-b208-4f25-a3ce-6a059ea559f2", +// "sync_policy": { +// "groups": [] +// } +// }` + +// rgwAdminZoneGetOutput := `{ +// "id": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f", +// "name": "my-store", +// "domain_root": "my-store.rgw.meta:root", +// "control_pool": "my-store.rgw.control", +// "gc_pool": "my-store.rgw.log:gc", +// "lc_pool": "my-store.rgw.log:lc", +// "log_pool": "my-store.rgw.log", +// "intent_log_pool": "my-store.rgw.log:intent", +// "usage_log_pool": "my-store.rgw.log:usage", +// "roles_pool": "my-store.rgw.meta:roles", +// "reshard_pool": "my-store.rgw.log:reshard", +// "user_keys_pool": "my-store.rgw.meta:users.keys", +// "user_email_pool": "my-store.rgw.meta:users.email", +// "user_swift_pool": "my-store.rgw.meta:users.swift", +// "user_uid_pool": "my-store.rgw.meta:users.uid", +// "otp_pool": "my-store.rgw.otp", +// "system_key": { +// "access_key": "", +// "secret_key": "" +// }, +// "placement_pools": [ +// { +// "key": "default-placement", +// "val": { +// "index_pool": "my-store.rgw.buckets.index", +// "storage_classes": { +// "STANDARD": { +// "data_pool": "my-store.rgw.buckets.data" +// } +// }, +// "data_extra_pool": "my-store.rgw.buckets.non-ec", +// "index_type": 0 +// } +// } +// ], +// "realm_id": "", +// "notif_pool": "my-store.rgw.log:notif" +// }` + +// t.Run("health checker fails to be created", func(t *testing.T) { +// executor := &exectest.MockExecutor{ +// MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { +// t.Logf("Command: %s %v", command, args) +// if args[0] == "osd" { +// if args[1] == "lspools" { +// output, err := json.Marshal(pools) +// assert.Nil(t, err) +// return string(output), nil +// } +// if args[1] == "pool" { +// if args[2] == "get" { +// poolName := args[3] +// return osdGetPoolOutput(poolName), nil +// } +// if args[2] == "set" { +// return "", nil // success +// } +// } +// } +// if args[0] == "config" { +// if args[1] == "get" && args[3] == "rgw_rados_pool_pg_num_min" { +// // ceph config get mon. rgw_rados_pool_pg_num_min +// return "8", nil +// } +// if args[1] == "set" && args[2] == "client.rgw.my.store.a" { +// return "", nil // success +// } +// } +// if args[0] == "auth" && args[1] == "get-or-create-key" { +// return `{"key":"mysecurekey"}`, nil +// } +// if args[0] == "dashboard" && args[1] == "get-rgw-api-access-key" { +// return access_key, nil +// } +// panic(fmt.Sprintf("unhandled command %s %v", command, args)) +// }, +// MockExecuteCommandWithTimeout: func(timeout time.Duration, command string, args ...string) (string, error) { +// t.Logf("Command: %s %v", command, args) +// if command == "radosgw-admin" { +// if args[0] == "user" { +// return userCreateJSON, nil +// } +// if args[0] == "realm" && args[1] == "get" { +// return rgwAdminRealmGetOutput, nil +// } +// if args[0] == "zonegroup" && args[1] == "get" { +// return rgwAdminZoneGroupGetOutput, nil +// } +// if args[0] == "zone" && args[1] == "get" { +// return rgwAdminZoneGetOutput, nil +// } +// } +// panic(fmt.Sprintf("unhandled command %s %v", command, args)) +// }, +// } + +// objStore := &cephv1.CephObjectStore{ +// TypeMeta: metav1.TypeMeta{ +// Kind: "CephObjectStore", +// APIVersion: "ceph.rook.io/v1", +// }, +// ObjectMeta: metav1.ObjectMeta{ +// Name: storeName, +// Namespace: ns, +// }, +// Spec: cephv1.ObjectStoreSpec{ +// Gateway: cephv1.GatewaySpec{ +// Port: 80, +// }, +// }, +// } + +// controllerRuntimeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objStore).Build() + +// reconciler := ReconcileCephObjectStore{ +// client: controllerRuntimeClient, +// // bktclient bktclient.Interface +// scheme: scheme, +// // context *clusterd.Context +// context: &clusterd.Context{ +// Clientset: k8sfake.NewSimpleClientset(), +// Executor: executor, +// }, +// clusterSpec: &cephv1.ClusterSpec{ +// DataDirHostPath: "/var/lib/rook", +// }, +// clusterInfo: &cephclient.ClusterInfo{ +// Namespace: ns, +// }, +// objectStoreChannels: map[string]*objectStoreHealth{ +// storeName: { +// monitoringRunning: false, +// }, +// }, +// // recorder *k8sutil.EventReporter +// } + +// origHTTPClientFunc := genObjectStoreHTTPClientFunc +// genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) { +// return nil, []byte{}, errors.New("induced error creating admin ops API connection") +// } +// defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }() + +// _, err := reconciler.reconcileCreateObjectStore(objStore, nsName, cluster) +// assert.Error(t, err) +// assert.Contains(t, err.Error(), "failed to start rgw health checker") +// assert.Contains(t, err.Error(), "induced error creating admin ops API connection") +// }) +// } diff --git a/pkg/operator/ceph/object/rgw.go b/pkg/operator/ceph/object/rgw.go index 058c5cdb4c71e..3c9c0248823e9 100644 --- a/pkg/operator/ceph/object/rgw.go +++ b/pkg/operator/ceph/object/rgw.go @@ -348,7 +348,10 @@ func GetTlsCaCert(objContext *Context, objectStoreSpec *cephv1.ObjectStoreSpec) return tlsCert, nil } -func GenObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { +// Allow overriding this function for unit tests to mock the admin ops api +var genObjectStoreHTTPClientFunc = genObjectStoreHTTPClient + +func genObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name) c := &http.Client{} tlsCert := []byte{}