From 6f9eb5eff32625978768457ceb6ed566c54be7bb Mon Sep 17 00:00:00 2001 From: Blaine Gardner Date: Mon, 13 Sep 2021 17:38:28 -0600 Subject: [PATCH] ceph: retry object health check if creation fails If the CephObjectStore health checker fails to be created, return a reconcile failure so that the reconcile will be run again and Rook will retry creating the health checker. This also means that Rook will not list the CephObjectStore as ready if the health checker can't be started. Signed-off-by: Blaine Gardner --- pkg/operator/ceph/object/admin.go | 2 +- pkg/operator/ceph/object/controller.go | 18 +- pkg/operator/ceph/object/controller_test.go | 253 ++++++++++++++++++++ pkg/operator/ceph/object/rgw.go | 7 +- 4 files changed, 271 insertions(+), 9 deletions(-) diff --git a/pkg/operator/ceph/object/admin.go b/pkg/operator/ceph/object/admin.go index 7124265ba475a..250c10f117506 100644 --- a/pkg/operator/ceph/object/admin.go +++ b/pkg/operator/ceph/object/admin.go @@ -150,7 +150,7 @@ func NewMultisiteAdminOpsContext( return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user") } - httpClient, tlsCert, err := GenObjectStoreHTTPClient(objContext, spec) + httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec) if err != nil { return nil, err } diff --git a/pkg/operator/ceph/object/controller.go b/pkg/operator/ceph/object/controller.go index 24e48095b66df..85e3370f06e0d 100644 --- a/pkg/operator/ceph/object/controller.go +++ b/pkg/operator/ceph/object/controller.go @@ -448,7 +448,10 @@ func (r *ReconcileCephObjectStore) reconcileCreateObjectStore(cephObjectStore *c // Start monitoring if !cephObjectStore.Spec.HealthCheck.Bucket.Disabled { - r.startMonitoring(cephObjectStore, objContext, namespacedName) + err = r.startMonitoring(cephObjectStore, objContext, namespacedName) + if err != nil { + return reconcile.Result{}, err + } } return reconcile.Result{}, nil @@ -513,20 +516,23 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1 return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil } -func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) { +func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error { // Start monitoring object store if r.objectStoreContexts[objectstore.Name].started { logger.Info("external rgw endpoint monitoring go routine already running!") - return + return nil } rgwChecker, err := newBucketChecker(r.context, objContext, r.client, namespacedName, &objectstore.Spec) if err != nil { - logger.Error(err) - return + return errors.Wrapf(err, "failed to start rgw health checker for CephObjectStore %q, will re-reconcile", namespacedName.String()) } - logger.Info("starting rgw healthcheck") + logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String()) go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx) + + // Set the monitoring flag so we don't start more than one go routine r.objectStoreContexts[objectstore.Name].started = true + + return nil } diff --git a/pkg/operator/ceph/object/controller_test.go b/pkg/operator/ceph/object/controller_test.go index 6c182cae860cc..a293bf8adc223 100644 --- a/pkg/operator/ceph/object/controller_test.go +++ b/pkg/operator/ceph/object/controller_test.go @@ -19,16 +19,21 @@ package object import ( "context" + "encoding/json" + "fmt" + "net/http" "os" "testing" "time" "github.com/coreos/pkg/capnslog" + "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" "github.com/rook/rook/pkg/client/clientset/versioned/scheme" "github.com/rook/rook/pkg/clusterd" "github.com/rook/rook/pkg/daemon/ceph/client" + cephclient "github.com/rook/rook/pkg/daemon/ceph/client" cephver "github.com/rook/rook/pkg/operator/ceph/version" "github.com/rook/rook/pkg/operator/k8sutil" "github.com/rook/rook/pkg/operator/test" @@ -38,6 +43,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + k8sfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -648,3 +654,250 @@ func TestCephObjectStoreControllerMultisite(t *testing.T) { err = r.client.Get(context.TODO(), req.NamespacedName, objectStore) assert.NoError(t, err) } + +func TestReconcileCephObjectStore_reconcileCreateObjectStore(t *testing.T) { + // create a Rook-Ceph scheme to use for our tests + scheme := runtime.NewScheme() + assert.NoError(t, cephv1.AddToScheme(scheme)) + + storeName := "my-store" + ns := "test-ns" + + nsName := types.NamespacedName{Namespace: ns, Name: storeName} + cluster := cephv1.ClusterSpec{} + + pools := []*client.CephStoragePoolSummary{ + {Name: "my-store.rgw.control"}, + {Name: "my-store.rgw.meta"}, + {Name: "my-store.rgw.log"}, + {Name: "my-store.rgw.buckets.non-ec"}, + {Name: "my-store.rgw.buckets.data"}, + {Name: ".rgw.root"}, + {Name: "my-store.rgw.buckets.index"}, + } + + osdGetPoolOutput := func(poolName string) string { + return fmt.Sprintf(`{ + "pool": "%s", + "pool_id": 5, + "size": 1, + "min_size": 1, + "pg_num": 8, + "pgp_num": 8, + "crush_rule": "%s", + "hashpspool": true, + "nodelete": false, + "nopgchange": false, + "nosizechange": false, + "write_fadvise_dontneed": false, + "noscrub": false, + "nodeep-scrub": false, + "use_gmt_hitset": true, + "fast_read": 0, + "compression_mode": "none", + "pg_autoscale_mode": "on", + "pg_num_min": 8 +}`, poolName, poolName) + } + + rgwAdminRealmGetOutput := `{ + "id": "29e44350-b208-4f25-a3ce-6a059ea559f2", + "name": "my-store", + "current_period": "0bd7f624-a22e-4779-a64d-913824d7e843", + "epoch": 2 +}` + + rgwAdminZoneGroupGetOutput := `{ + "id": "e03a68ee-f4a0-438c-826a-399ace923f2f", + "name": "my-store", + "api_name": "my-store", + "is_master": "true", + "endpoints": [ + "http://10.111.19.44:80" + ], + "hostnames": [], + "hostnames_s3website": [], + "master_zone": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f", + "zones": [ + { + "id": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f", + "name": "my-store", + "endpoints": [ + "http://10.111.19.44:80" + ], + "log_meta": "false", + "log_data": "false", + "bucket_index_max_shards": 11, + "read_only": "false", + "tier_type": "", + "sync_from_all": "true", + "sync_from": [], + "redirect_zone": "" + } + ], + "placement_targets": [ + { + "name": "default-placement", + "tags": [], + "storage_classes": [ + "STANDARD" + ] + } + ], + "default_placement": "default-placement", + "realm_id": "29e44350-b208-4f25-a3ce-6a059ea559f2", + "sync_policy": { + "groups": [] + } +}` + + rgwAdminZoneGetOutput := `{ + "id": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f", + "name": "my-store", + "domain_root": "my-store.rgw.meta:root", + "control_pool": "my-store.rgw.control", + "gc_pool": "my-store.rgw.log:gc", + "lc_pool": "my-store.rgw.log:lc", + "log_pool": "my-store.rgw.log", + "intent_log_pool": "my-store.rgw.log:intent", + "usage_log_pool": "my-store.rgw.log:usage", + "roles_pool": "my-store.rgw.meta:roles", + "reshard_pool": "my-store.rgw.log:reshard", + "user_keys_pool": "my-store.rgw.meta:users.keys", + "user_email_pool": "my-store.rgw.meta:users.email", + "user_swift_pool": "my-store.rgw.meta:users.swift", + "user_uid_pool": "my-store.rgw.meta:users.uid", + "otp_pool": "my-store.rgw.otp", + "system_key": { + "access_key": "", + "secret_key": "" + }, + "placement_pools": [ + { + "key": "default-placement", + "val": { + "index_pool": "my-store.rgw.buckets.index", + "storage_classes": { + "STANDARD": { + "data_pool": "my-store.rgw.buckets.data" + } + }, + "data_extra_pool": "my-store.rgw.buckets.non-ec", + "index_type": 0 + } + } + ], + "realm_id": "", + "notif_pool": "my-store.rgw.log:notif" +}` + + t.Run("health checker fails to be created", func(t *testing.T) { + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + t.Logf("Command: %s %v", command, args) + if args[0] == "osd" { + if args[1] == "lspools" { + output, err := json.Marshal(pools) + assert.Nil(t, err) + return string(output), nil + } + if args[1] == "pool" { + if args[2] == "get" { + poolName := args[3] + return osdGetPoolOutput(poolName), nil + } + if args[2] == "set" { + return "", nil // success + } + } + } + if args[0] == "config" { + if args[1] == "get" && args[3] == "rgw_rados_pool_pg_num_min" { + // ceph config get mon. rgw_rados_pool_pg_num_min + return "8", nil + } + if args[1] == "set" && args[2] == "client.rgw.my.store.a" { + return "", nil // success + } + } + if args[0] == "auth" && args[1] == "get-or-create-key" { + return `{"key":"mysecurekey"}`, nil + } + if args[0] == "dashboard" && args[1] == "get-rgw-api-access-key" { + return access_key, nil + } + panic(fmt.Sprintf("unhandled command %s %v", command, args)) + }, + MockExecuteCommandWithTimeout: func(timeout time.Duration, command string, args ...string) (string, error) { + t.Logf("Command: %s %v", command, args) + if command == "radosgw-admin" { + if args[0] == "user" { + return userCreateJSON, nil + } + if args[0] == "realm" && args[1] == "get" { + return rgwAdminRealmGetOutput, nil + } + if args[0] == "zonegroup" && args[1] == "get" { + return rgwAdminZoneGroupGetOutput, nil + } + if args[0] == "zone" && args[1] == "get" { + return rgwAdminZoneGetOutput, nil + } + } + panic(fmt.Sprintf("unhandled command %s %v", command, args)) + }, + } + + objStore := &cephv1.CephObjectStore{ + TypeMeta: metav1.TypeMeta{ + Kind: "CephObjectStore", + APIVersion: "ceph.rook.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: storeName, + Namespace: ns, + }, + Spec: cephv1.ObjectStoreSpec{ + Gateway: cephv1.GatewaySpec{ + Port: 80, + }, + }, + } + + controllerRuntimeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objStore).Build() + + reconciler := ReconcileCephObjectStore{ + client: controllerRuntimeClient, + // bktclient bktclient.Interface + scheme: scheme, + // context *clusterd.Context + context: &clusterd.Context{ + Clientset: k8sfake.NewSimpleClientset(), + Executor: executor, + }, + clusterSpec: &cephv1.ClusterSpec{ + DataDirHostPath: "/var/lib/rook", + }, + clusterInfo: &cephclient.ClusterInfo{ + Namespace: ns, + }, + objectStoreChannels: map[string]*objectStoreHealth{ + storeName: { + monitoringRunning: false, + }, + }, + // recorder *k8sutil.EventReporter + } + + origHTTPClientFunc := genObjectStoreHTTPClientFunc + genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) { + return nil, []byte{}, errors.New("induced error creating admin ops API connection") + } + defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }() + + _, err := reconciler.reconcileCreateObjectStore(objStore, nsName, cluster) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to start rgw health checker") + assert.Contains(t, err.Error(), "induced error creating admin ops API connection") + }) +} diff --git a/pkg/operator/ceph/object/rgw.go b/pkg/operator/ceph/object/rgw.go index 058c5cdb4c71e..d0ab7a693d763 100644 --- a/pkg/operator/ceph/object/rgw.go +++ b/pkg/operator/ceph/object/rgw.go @@ -348,10 +348,13 @@ func GetTlsCaCert(objContext *Context, objectStoreSpec *cephv1.ObjectStoreSpec) return tlsCert, nil } -func GenObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) { +// Allow overriding this function for unit tests to mock the admin ops api +var genObjectStoreHTTPClientFunc = genObjectStoreHTTPClient + +func genObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) { nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name) c := &http.Client{} - tlsCert := []byte{} + tlsCert = []byte{} if spec.IsTLSEnabled() { var err error tlsCert, err = GetTlsCaCert(objContext, spec)