Skip to content

Commit

Permalink
ceph: retry object health check if creation fails
Browse files Browse the repository at this point in the history
If the CephObjectStore health checker fails to be created, return a
reconcile failure so that the reconcile will be run again and Rook will
retry creating the health checker. This also means that Rook will not
list the CephObjectStore as ready if the health checker can't be
started.

Signed-off-by: Blaine Gardner <blaine.gardner@redhat.com>
  • Loading branch information
BlaineEXE committed Sep 17, 2021
1 parent 0d8fd9d commit 7b4f2e6
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/admin.go
Expand Up @@ -150,7 +150,7 @@ func NewMultisiteAdminOpsContext(
return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user")
}

httpClient, tlsCert, err := GenObjectStoreHTTPClient(objContext, spec)
httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec)
if err != nil {
return nil, err
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -448,7 +448,10 @@ func (r *ReconcileCephObjectStore) reconcileCreateObjectStore(cephObjectStore *c

// Start monitoring
if !cephObjectStore.Spec.HealthCheck.Bucket.Disabled {
r.startMonitoring(cephObjectStore, objContext, namespacedName)
err = r.startMonitoring(cephObjectStore, objContext, namespacedName)
if err != nil {
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
Expand Down Expand Up @@ -513,20 +516,23 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) {
func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
// Start monitoring object store
if r.objectStoreContexts[objectstore.Name].started {
logger.Info("external rgw endpoint monitoring go routine already running!")
return
return nil
}

rgwChecker, err := newBucketChecker(r.context, objContext, r.client, namespacedName, &objectstore.Spec)
if err != nil {
logger.Error(err)
return
return errors.Wrapf(err, "failed to start rgw health checker for CephObjectStore %q, will re-reconcile", namespacedName.String())
}

logger.Info("starting rgw healthcheck")
logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreContexts[objectstore.Name].started = true

return nil
}
253 changes: 253 additions & 0 deletions pkg/operator/ceph/object/controller_test.go
Expand Up @@ -19,16 +19,21 @@ package object

import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"testing"
"time"

"github.com/coreos/pkg/capnslog"
"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/daemon/ceph/client"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
cephver "github.com/rook/rook/pkg/operator/ceph/version"
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/operator/test"
Expand All @@ -38,6 +43,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -648,3 +654,250 @@ func TestCephObjectStoreControllerMultisite(t *testing.T) {
err = r.client.Get(context.TODO(), req.NamespacedName, objectStore)
assert.NoError(t, err)
}

func TestReconcileCephObjectStore_reconcileCreateObjectStore(t *testing.T) {
// create a Rook-Ceph scheme to use for our tests
scheme := runtime.NewScheme()
assert.NoError(t, cephv1.AddToScheme(scheme))

storeName := "my-store"
ns := "test-ns"

nsName := types.NamespacedName{Namespace: ns, Name: storeName}
cluster := cephv1.ClusterSpec{}

pools := []*client.CephStoragePoolSummary{
{Name: "my-store.rgw.control"},
{Name: "my-store.rgw.meta"},
{Name: "my-store.rgw.log"},
{Name: "my-store.rgw.buckets.non-ec"},
{Name: "my-store.rgw.buckets.data"},
{Name: ".rgw.root"},
{Name: "my-store.rgw.buckets.index"},
}

osdGetPoolOutput := func(poolName string) string {
return fmt.Sprintf(`{
"pool": "%s",
"pool_id": 5,
"size": 1,
"min_size": 1,
"pg_num": 8,
"pgp_num": 8,
"crush_rule": "%s",
"hashpspool": true,
"nodelete": false,
"nopgchange": false,
"nosizechange": false,
"write_fadvise_dontneed": false,
"noscrub": false,
"nodeep-scrub": false,
"use_gmt_hitset": true,
"fast_read": 0,
"compression_mode": "none",
"pg_autoscale_mode": "on",
"pg_num_min": 8
}`, poolName, poolName)
}

rgwAdminRealmGetOutput := `{
"id": "29e44350-b208-4f25-a3ce-6a059ea559f2",
"name": "my-store",
"current_period": "0bd7f624-a22e-4779-a64d-913824d7e843",
"epoch": 2
}`

rgwAdminZoneGroupGetOutput := `{
"id": "e03a68ee-f4a0-438c-826a-399ace923f2f",
"name": "my-store",
"api_name": "my-store",
"is_master": "true",
"endpoints": [
"http://10.111.19.44:80"
],
"hostnames": [],
"hostnames_s3website": [],
"master_zone": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f",
"zones": [
{
"id": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f",
"name": "my-store",
"endpoints": [
"http://10.111.19.44:80"
],
"log_meta": "false",
"log_data": "false",
"bucket_index_max_shards": 11,
"read_only": "false",
"tier_type": "",
"sync_from_all": "true",
"sync_from": [],
"redirect_zone": ""
}
],
"placement_targets": [
{
"name": "default-placement",
"tags": [],
"storage_classes": [
"STANDARD"
]
}
],
"default_placement": "default-placement",
"realm_id": "29e44350-b208-4f25-a3ce-6a059ea559f2",
"sync_policy": {
"groups": []
}
}`

rgwAdminZoneGetOutput := `{
"id": "5d4f36ea-b4b4-405c-8b60-0c7bfc71072f",
"name": "my-store",
"domain_root": "my-store.rgw.meta:root",
"control_pool": "my-store.rgw.control",
"gc_pool": "my-store.rgw.log:gc",
"lc_pool": "my-store.rgw.log:lc",
"log_pool": "my-store.rgw.log",
"intent_log_pool": "my-store.rgw.log:intent",
"usage_log_pool": "my-store.rgw.log:usage",
"roles_pool": "my-store.rgw.meta:roles",
"reshard_pool": "my-store.rgw.log:reshard",
"user_keys_pool": "my-store.rgw.meta:users.keys",
"user_email_pool": "my-store.rgw.meta:users.email",
"user_swift_pool": "my-store.rgw.meta:users.swift",
"user_uid_pool": "my-store.rgw.meta:users.uid",
"otp_pool": "my-store.rgw.otp",
"system_key": {
"access_key": "",
"secret_key": ""
},
"placement_pools": [
{
"key": "default-placement",
"val": {
"index_pool": "my-store.rgw.buckets.index",
"storage_classes": {
"STANDARD": {
"data_pool": "my-store.rgw.buckets.data"
}
},
"data_extra_pool": "my-store.rgw.buckets.non-ec",
"index_type": 0
}
}
],
"realm_id": "",
"notif_pool": "my-store.rgw.log:notif"
}`

t.Run("health checker fails to be created", func(t *testing.T) {
executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
t.Logf("Command: %s %v", command, args)
if args[0] == "osd" {
if args[1] == "lspools" {
output, err := json.Marshal(pools)
assert.Nil(t, err)
return string(output), nil
}
if args[1] == "pool" {
if args[2] == "get" {
poolName := args[3]
return osdGetPoolOutput(poolName), nil
}
if args[2] == "set" {
return "", nil // success
}
}
}
if args[0] == "config" {
if args[1] == "get" && args[3] == "rgw_rados_pool_pg_num_min" {
// ceph config get mon. rgw_rados_pool_pg_num_min
return "8", nil
}
if args[1] == "set" && args[2] == "client.rgw.my.store.a" {
return "", nil // success
}
}
if args[0] == "auth" && args[1] == "get-or-create-key" {
return `{"key":"mysecurekey"}`, nil
}
if args[0] == "dashboard" && args[1] == "get-rgw-api-access-key" {
return access_key, nil
}
panic(fmt.Sprintf("unhandled command %s %v", command, args))
},
MockExecuteCommandWithTimeout: func(timeout time.Duration, command string, args ...string) (string, error) {
t.Logf("Command: %s %v", command, args)
if command == "radosgw-admin" {
if args[0] == "user" {
return userCreateJSON, nil
}
if args[0] == "realm" && args[1] == "get" {
return rgwAdminRealmGetOutput, nil
}
if args[0] == "zonegroup" && args[1] == "get" {
return rgwAdminZoneGroupGetOutput, nil
}
if args[0] == "zone" && args[1] == "get" {
return rgwAdminZoneGetOutput, nil
}
}
panic(fmt.Sprintf("unhandled command %s %v", command, args))
},
}

objStore := &cephv1.CephObjectStore{
TypeMeta: metav1.TypeMeta{
Kind: "CephObjectStore",
APIVersion: "ceph.rook.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: storeName,
Namespace: ns,
},
Spec: cephv1.ObjectStoreSpec{
Gateway: cephv1.GatewaySpec{
Port: 80,
},
},
}

controllerRuntimeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objStore).Build()

reconciler := ReconcileCephObjectStore{
client: controllerRuntimeClient,
// bktclient bktclient.Interface
scheme: scheme,
// context *clusterd.Context
context: &clusterd.Context{
Clientset: k8sfake.NewSimpleClientset(),
Executor: executor,
},
clusterSpec: &cephv1.ClusterSpec{
DataDirHostPath: "/var/lib/rook",
},
clusterInfo: &cephclient.ClusterInfo{
Namespace: ns,
},
objectStoreChannels: map[string]*objectStoreHealth{
storeName: {
monitoringRunning: false,
},
},
// recorder *k8sutil.EventReporter
}

origHTTPClientFunc := genObjectStoreHTTPClientFunc
genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) {
return nil, []byte{}, errors.New("induced error creating admin ops API connection")
}
defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }()

_, err := reconciler.reconcileCreateObjectStore(objStore, nsName, cluster)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to start rgw health checker")
assert.Contains(t, err.Error(), "induced error creating admin ops API connection")
})
}
5 changes: 4 additions & 1 deletion pkg/operator/ceph/object/rgw.go
Expand Up @@ -348,7 +348,10 @@ func GetTlsCaCert(objContext *Context, objectStoreSpec *cephv1.ObjectStoreSpec)
return tlsCert, nil
}

func GenObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) {
// Allow overriding this function for unit tests to mock the admin ops api
var genObjectStoreHTTPClientFunc = genObjectStoreHTTPClient

func genObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) {
nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name)
c := &http.Client{}
tlsCert := []byte{}
Expand Down

0 comments on commit 7b4f2e6

Please sign in to comment.