Skip to content

Commit

Permalink
Merge pull request #8708 from BlaineEXE/health-checker-failure-should…
Browse files Browse the repository at this point in the history
…-fail-reconcile

ceph: retry object health check if creation fails
  • Loading branch information
BlaineEXE committed Sep 20, 2021
2 parents 8755759 + 5383ba2 commit acbda93
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/admin.go
Expand Up @@ -150,7 +150,7 @@ func NewMultisiteAdminOpsContext(
return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user")
}

httpClient, tlsCert, err := GenObjectStoreHTTPClient(objContext, spec)
httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec)
if err != nil {
return nil, err
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -448,7 +448,10 @@ func (r *ReconcileCephObjectStore) reconcileCreateObjectStore(cephObjectStore *c

// Start monitoring
if !cephObjectStore.Spec.HealthCheck.Bucket.Disabled {
r.startMonitoring(cephObjectStore, objContext, namespacedName)
err = r.startMonitoring(cephObjectStore, objContext, namespacedName)
if err != nil {
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
Expand Down Expand Up @@ -513,20 +516,23 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) {
func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
// Start monitoring object store
if r.objectStoreContexts[objectstore.Name].started {
logger.Info("external rgw endpoint monitoring go routine already running!")
return
return nil
}

rgwChecker, err := newBucketChecker(r.context, objContext, r.client, namespacedName, &objectstore.Spec)
if err != nil {
logger.Error(err)
return
return errors.Wrapf(err, "failed to start rgw health checker for CephObjectStore %q, will re-reconcile", namespacedName.String())
}

logger.Info("starting rgw healthcheck")
logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreContexts[objectstore.Name].started = true

return nil
}
196 changes: 111 additions & 85 deletions pkg/operator/ceph/object/controller_test.go
Expand Up @@ -19,11 +19,13 @@ package object

import (
"context"
"net/http"
"os"
"testing"
"time"

"github.com/coreos/pkg/capnslog"
"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -283,52 +285,60 @@ func TestCephObjectStoreController(t *testing.T) {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
os.Setenv("ROOK_LOG_LEVEL", "DEBUG")

// A Pool resource with metadata and spec.
objectStore := &cephv1.CephObjectStore{
ObjectMeta: metav1.ObjectMeta{
Name: store,
Namespace: namespace,
},
Spec: cephv1.ObjectStoreSpec{},
TypeMeta: controllerTypeMeta,
}
objectStore.Spec.Gateway.Port = 80
setupNewEnvironment := func(additionalObjects ...runtime.Object) *ReconcileCephObjectStore {
// A Pool resource with metadata and spec.
objectStore := &cephv1.CephObjectStore{
ObjectMeta: metav1.ObjectMeta{
Name: store,
Namespace: namespace,
},
Spec: cephv1.ObjectStoreSpec{},
TypeMeta: controllerTypeMeta,
}
objectStore.Spec.Gateway.Port = 80

// Objects to track in the fake client.
object := []runtime.Object{
objectStore,
}
// Objects to track in the fake client.
objects := []runtime.Object{
objectStore,
}

executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
if args[0] == "status" {
return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil
}
return "", nil
},
}
clientset := test.New(t, 3)
c := &clusterd.Context{
Executor: executor,
RookClientset: rookclient.NewSimpleClientset(),
Clientset: clientset,
}
for i := range additionalObjects {
objects = append(objects, additionalObjects[i])
}

// Register operator types with the runtime scheme.
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{})
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{})
executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
if args[0] == "status" {
return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil
}
return "", nil
},
}
clientset := test.New(t, 3)
c := &clusterd.Context{
Executor: executor,
RookClientset: rookclient.NewSimpleClientset(),
Clientset: clientset,
}

// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
// Create a ReconcileCephObjectStore object with the scheme and fake client.
r := &ReconcileCephObjectStore{
client: cl,
scheme: s,
context: c,
objectStoreContexts: make(map[string]*objectStoreHealth),
recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)),
opManagerContext: context.TODO(),
// Register operator types with the runtime scheme.
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{})
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{})

// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build()
// Create a ReconcileCephObjectStore object with the scheme and fake client.
r := &ReconcileCephObjectStore{
client: cl,
scheme: s,
context: c,
objectStoreContexts: make(map[string]*objectStoreHealth),
recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)),
opManagerContext: context.TODO(),
}

return r
}

// Mock request to simulate Reconcile() being called on an event for a
Expand All @@ -340,48 +350,56 @@ func TestCephObjectStoreController(t *testing.T) {
},
}

cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Namespace: namespace,
},
Status: cephv1.ClusterStatus{
Phase: "",
CephStatus: &cephv1.CephStatus{
Health: "",
},
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
return &cephver.Pacific, &cephver.Pacific, nil
}

t.Run("error - no ceph cluster", func(t *testing.T) {
r := setupNewEnvironment()

res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
})

t.Run("error - ceph cluster not ready", func(t *testing.T) {
object = append(object, cephCluster)
// Create a fake client to mock API calls.
cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
// Create a ReconcileCephObjectStore object with the scheme and fake client.
r = &ReconcileCephObjectStore{
client: cl,
scheme: s,
context: c,
objectStoreContexts: make(map[string]*objectStoreHealth),
recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)),
opManagerContext: context.TODO(),
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Namespace: namespace,
},
Status: cephv1.ClusterStatus{
Phase: "",
CephStatus: &cephv1.CephStatus{
Health: "",
},
},
}

r := setupNewEnvironment(cephCluster)

res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
})

t.Run("success - object store is running", func(t *testing.T) {
// set up an environment that has a ready ceph cluster, and return the reconciler for it
setupEnvironmentWithReadyCephCluster := func() *ReconcileCephObjectStore {
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Namespace: namespace,
},
Status: cephv1.ClusterStatus{
Phase: k8sutil.ReadyStatus,
CephStatus: &cephv1.CephStatus{
Health: "HEALTH_OK",
},
},
}

r := setupNewEnvironment(cephCluster)

secrets := map[string][]byte{
"fsid": []byte(name),
"mon-secret": []byte("monsecret"),
Expand All @@ -395,18 +413,11 @@ func TestCephObjectStoreController(t *testing.T) {
Data: secrets,
Type: k8sutil.RookType,
}
_, err := c.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
_, err := r.context.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
assert.NoError(t, err)

// Add ready status to the CephCluster
cephCluster.Status.Phase = k8sutil.ReadyStatus
cephCluster.Status.CephStatus.Health = "HEALTH_OK"

// Create a fake client to mock API calls.
cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()

// Override executor with the new ceph status and more content
executor = &exectest.MockExecutor{
executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
if args[0] == "status" {
return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_OK"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil
Expand Down Expand Up @@ -452,21 +463,36 @@ func TestCephObjectStoreController(t *testing.T) {
return "", nil
},
}
c.Executor = executor
r.context.Executor = executor

// Create a ReconcileCephObjectStore object with the scheme and fake client.
r = &ReconcileCephObjectStore{
client: cl,
scheme: s,
context: c,
objectStoreContexts: make(map[string]*objectStoreHealth),
recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)),
opManagerContext: context.TODO(),
return r
}

t.Run("error - failed to start health checker", func(t *testing.T) {
r := setupEnvironmentWithReadyCephCluster()

// cause a failure when creating the admin ops api for the health check
origHTTPClientFunc := genObjectStoreHTTPClientFunc
genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) {
return nil, []byte{}, errors.New("induced error creating admin ops API connection")
}
defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }()

_, err := r.Reconcile(ctx, req)
assert.Error(t, err)
// we don't actually care if Requeue is true if there is an error assert.True(t, res.Requeue)
assert.Contains(t, err.Error(), "failed to start rgw health checker")
assert.Contains(t, err.Error(), "induced error creating admin ops API connection")
})

t.Run("success - object store is running", func(t *testing.T) {
r := setupEnvironmentWithReadyCephCluster()

res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.False(t, res.Requeue)

objectStore := &cephv1.CephObjectStore{}
err = r.client.Get(context.TODO(), req.NamespacedName, objectStore)
assert.NoError(t, err)
assert.Equal(t, cephv1.ConditionProgressing, objectStore.Status.Phase, objectStore)
Expand Down
5 changes: 4 additions & 1 deletion pkg/operator/ceph/object/rgw.go
Expand Up @@ -348,7 +348,10 @@ func GetTlsCaCert(objContext *Context, objectStoreSpec *cephv1.ObjectStoreSpec)
return tlsCert, nil
}

func GenObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) {
// Allow overriding this function for unit tests to mock the admin ops api
var genObjectStoreHTTPClientFunc = genObjectStoreHTTPClient

func genObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) {
nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name)
c := &http.Client{}
tlsCert := []byte{}
Expand Down

0 comments on commit acbda93

Please sign in to comment.