Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ceph: retry object health check if creation fails #8708

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/admin.go
Expand Up @@ -150,7 +150,7 @@ func NewMultisiteAdminOpsContext(
return nil, errors.Wrapf(err, "failed to create or retrieve rgw admin ops user")
}

httpClient, tlsCert, err := GenObjectStoreHTTPClient(objContext, spec)
httpClient, tlsCert, err := genObjectStoreHTTPClientFunc(objContext, spec)
if err != nil {
return nil, err
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/operator/ceph/object/controller.go
Expand Up @@ -448,7 +448,10 @@ func (r *ReconcileCephObjectStore) reconcileCreateObjectStore(cephObjectStore *c

// Start monitoring
if !cephObjectStore.Spec.HealthCheck.Bucket.Disabled {
r.startMonitoring(cephObjectStore, objContext, namespacedName)
err = r.startMonitoring(cephObjectStore, objContext, namespacedName)
if err != nil {
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
Expand Down Expand Up @@ -513,20 +516,23 @@ func (r *ReconcileCephObjectStore) reconcileMultisiteCRs(cephObjectStore *cephv1
return cephObjectStore.Name, cephObjectStore.Name, cephObjectStore.Name, reconcile.Result{}, nil
}

func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) {
func (r *ReconcileCephObjectStore) startMonitoring(objectstore *cephv1.CephObjectStore, objContext *Context, namespacedName types.NamespacedName) error {
// Start monitoring object store
if r.objectStoreContexts[objectstore.Name].started {
logger.Info("external rgw endpoint monitoring go routine already running!")
return
return nil
}

rgwChecker, err := newBucketChecker(r.context, objContext, r.client, namespacedName, &objectstore.Spec)
if err != nil {
logger.Error(err)
return
return errors.Wrapf(err, "failed to start rgw health checker for CephObjectStore %q, will re-reconcile", namespacedName.String())
}

logger.Info("starting rgw healthcheck")
logger.Infof("starting rgw health checker for CephObjectStore %q", namespacedName.String())
go rgwChecker.checkObjectStore(r.objectStoreContexts[objectstore.Name].internalCtx)

// Set the monitoring flag so we don't start more than one go routine
r.objectStoreContexts[objectstore.Name].started = true

return nil
}
196 changes: 111 additions & 85 deletions pkg/operator/ceph/object/controller_test.go
Expand Up @@ -19,11 +19,13 @@ package object

import (
"context"
"net/http"
"os"
"testing"
"time"

"github.com/coreos/pkg/capnslog"
"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -283,52 +285,60 @@ func TestCephObjectStoreController(t *testing.T) {
capnslog.SetGlobalLogLevel(capnslog.DEBUG)
os.Setenv("ROOK_LOG_LEVEL", "DEBUG")

// A Pool resource with metadata and spec.
objectStore := &cephv1.CephObjectStore{
ObjectMeta: metav1.ObjectMeta{
Name: store,
Namespace: namespace,
},
Spec: cephv1.ObjectStoreSpec{},
TypeMeta: controllerTypeMeta,
}
objectStore.Spec.Gateway.Port = 80
setupNewEnvironment := func(additionalObjects ...runtime.Object) *ReconcileCephObjectStore {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this function to setup a new test environment ReconcileCephObjectStore so each test is independent.

// A Pool resource with metadata and spec.
objectStore := &cephv1.CephObjectStore{
ObjectMeta: metav1.ObjectMeta{
Name: store,
Namespace: namespace,
},
Spec: cephv1.ObjectStoreSpec{},
TypeMeta: controllerTypeMeta,
}
objectStore.Spec.Gateway.Port = 80

// Objects to track in the fake client.
object := []runtime.Object{
objectStore,
}
// Objects to track in the fake client.
objects := []runtime.Object{
objectStore,
}

executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
if args[0] == "status" {
return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil
}
return "", nil
},
}
clientset := test.New(t, 3)
c := &clusterd.Context{
Executor: executor,
RookClientset: rookclient.NewSimpleClientset(),
Clientset: clientset,
}
for i := range additionalObjects {
objects = append(objects, additionalObjects[i])
}

// Register operator types with the runtime scheme.
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{})
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{})
executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
if args[0] == "status" {
return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_ERR"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil
}
return "", nil
},
}
clientset := test.New(t, 3)
c := &clusterd.Context{
Executor: executor,
RookClientset: rookclient.NewSimpleClientset(),
Clientset: clientset,
}

// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
// Create a ReconcileCephObjectStore object with the scheme and fake client.
r := &ReconcileCephObjectStore{
client: cl,
scheme: s,
context: c,
objectStoreContexts: make(map[string]*objectStoreHealth),
recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)),
opManagerContext: context.TODO(),
// Register operator types with the runtime scheme.
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephObjectStore{})
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{})

// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build()
// Create a ReconcileCephObjectStore object with the scheme and fake client.
r := &ReconcileCephObjectStore{
client: cl,
scheme: s,
context: c,
objectStoreContexts: make(map[string]*objectStoreHealth),
recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)),
opManagerContext: context.TODO(),
}

return r
}

// Mock request to simulate Reconcile() being called on an event for a
Expand All @@ -340,48 +350,56 @@ func TestCephObjectStoreController(t *testing.T) {
},
}

cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Namespace: namespace,
},
Status: cephv1.ClusterStatus{
Phase: "",
CephStatus: &cephv1.CephStatus{
Health: "",
},
},
}

currentAndDesiredCephVersion = func(rookImage string, namespace string, jobName string, ownerInfo *k8sutil.OwnerInfo, context *clusterd.Context, cephClusterSpec *cephv1.ClusterSpec, clusterInfo *client.ClusterInfo) (*cephver.CephVersion, *cephver.CephVersion, error) {
return &cephver.Pacific, &cephver.Pacific, nil
}

t.Run("error - no ceph cluster", func(t *testing.T) {
r := setupNewEnvironment()

res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
})

t.Run("error - ceph cluster not ready", func(t *testing.T) {
object = append(object, cephCluster)
// Create a fake client to mock API calls.
cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
// Create a ReconcileCephObjectStore object with the scheme and fake client.
r = &ReconcileCephObjectStore{
client: cl,
scheme: s,
context: c,
objectStoreContexts: make(map[string]*objectStoreHealth),
recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)),
opManagerContext: context.TODO(),
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Namespace: namespace,
},
Status: cephv1.ClusterStatus{
Phase: "",
CephStatus: &cephv1.CephStatus{
Health: "",
},
},
}

r := setupNewEnvironment(cephCluster)

res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
})

t.Run("success - object store is running", func(t *testing.T) {
// set up an environment that has a ready ceph cluster, and return the reconciler for it
setupEnvironmentWithReadyCephCluster := func() *ReconcileCephObjectStore {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this additional function to help set up test environment ReconcileCephObjectStores where the CephCluster should be ready, allowing reconcile to proceed. (used in the previously existing successful test and the new test added)

cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Namespace: namespace,
},
Status: cephv1.ClusterStatus{
Phase: k8sutil.ReadyStatus,
CephStatus: &cephv1.CephStatus{
Health: "HEALTH_OK",
},
},
}

r := setupNewEnvironment(cephCluster)

secrets := map[string][]byte{
"fsid": []byte(name),
"mon-secret": []byte("monsecret"),
Expand All @@ -395,18 +413,11 @@ func TestCephObjectStoreController(t *testing.T) {
Data: secrets,
Type: k8sutil.RookType,
}
_, err := c.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
_, err := r.context.Clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
assert.NoError(t, err)

// Add ready status to the CephCluster
cephCluster.Status.Phase = k8sutil.ReadyStatus
cephCluster.Status.CephStatus.Health = "HEALTH_OK"

// Create a fake client to mock API calls.
cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()

// Override executor with the new ceph status and more content
executor = &exectest.MockExecutor{
executor := &exectest.MockExecutor{
MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) {
if args[0] == "status" {
return `{"fsid":"c47cac40-9bee-4d52-823b-ccd803ba5bfe","health":{"checks":{},"status":"HEALTH_OK"},"pgmap":{"num_pgs":100,"pgs_by_state":[{"state_name":"active+clean","count":100}]}}`, nil
Expand Down Expand Up @@ -452,21 +463,36 @@ func TestCephObjectStoreController(t *testing.T) {
return "", nil
},
}
c.Executor = executor
r.context.Executor = executor

// Create a ReconcileCephObjectStore object with the scheme and fake client.
r = &ReconcileCephObjectStore{
client: cl,
scheme: s,
context: c,
objectStoreContexts: make(map[string]*objectStoreHealth),
recorder: k8sutil.NewEventReporter(record.NewFakeRecorder(5)),
opManagerContext: context.TODO(),
return r
}

t.Run("error - failed to start health checker", func(t *testing.T) {
r := setupEnvironmentWithReadyCephCluster()

// cause a failure when creating the admin ops api for the health check
origHTTPClientFunc := genObjectStoreHTTPClientFunc
genObjectStoreHTTPClientFunc = func(objContext *Context, spec *cephv1.ObjectStoreSpec) (client *http.Client, tlsCert []byte, err error) {
return nil, []byte{}, errors.New("induced error creating admin ops API connection")
}
defer func() { genObjectStoreHTTPClientFunc = origHTTPClientFunc }()

_, err := r.Reconcile(ctx, req)
assert.Error(t, err)
// we don't actually care if Requeue is true if there is an error assert.True(t, res.Requeue)
assert.Contains(t, err.Error(), "failed to start rgw health checker")
assert.Contains(t, err.Error(), "induced error creating admin ops API connection")
})

t.Run("success - object store is running", func(t *testing.T) {
r := setupEnvironmentWithReadyCephCluster()

res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.False(t, res.Requeue)

objectStore := &cephv1.CephObjectStore{}
err = r.client.Get(context.TODO(), req.NamespacedName, objectStore)
assert.NoError(t, err)
assert.Equal(t, cephv1.ConditionProgressing, objectStore.Status.Phase, objectStore)
Expand Down
5 changes: 4 additions & 1 deletion pkg/operator/ceph/object/rgw.go
Expand Up @@ -348,7 +348,10 @@ func GetTlsCaCert(objContext *Context, objectStoreSpec *cephv1.ObjectStoreSpec)
return tlsCert, nil
}

func GenObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) {
// Allow overriding this function for unit tests to mock the admin ops api
var genObjectStoreHTTPClientFunc = genObjectStoreHTTPClient

func genObjectStoreHTTPClient(objContext *Context, spec *cephv1.ObjectStoreSpec) (*http.Client, []byte, error) {
nsName := fmt.Sprintf("%s/%s", objContext.clusterInfo.Namespace, objContext.Name)
c := &http.Client{}
tlsCert := []byte{}
Expand Down