Skip to content

Commit

Permalink
rgw: add recocile failure events to CephBucketNotification
Browse files Browse the repository at this point in the history
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
  • Loading branch information
yuvalif committed Dec 21, 2021
1 parent 570f09e commit f5acb87
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
17 changes: 15 additions & 2 deletions pkg/operator/ceph/object/notification/controller.go
Expand Up @@ -19,6 +19,7 @@ package notification

import (
"context"
"fmt"
"time"

"github.com/coreos/pkg/capnslog"
Expand Down Expand Up @@ -140,16 +141,20 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
failureMessage := fmt.Sprintf("Reconcile failed for %s, topic %q not provisioned yet", bnName, topicName)
logger.Info(failureMessage)
r.recorder.Event(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), failureMessage)
return waitForRequeueIfTopicNotReady, nil
}

// Populate clusterInfo during each reconcile
clusterInfo, clusterSpec, err := getReadyCluster(r.client, r.opManagerContext, *r.context, bucketTopic.Spec.ObjectStoreNamespace)
if err != nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s with error %s", bnName, err.Error())
return opcontroller.WaitForRequeueIfCephClusterNotReady, errors.Wrapf(err, "cluster is not ready")
}
if clusterInfo == nil || clusterSpec == nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s, cluster is not ready", bnName)
return opcontroller.WaitForRequeueIfCephClusterNotReady, nil
}

Expand All @@ -161,6 +166,7 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
obcList := &bktv1alpha1.ObjectBucketClaimList{}
err = r.client.List(r.opManagerContext, obcList, namespaceListOpt, labelListOpt)
if err != nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s with error %s", bnName, err.Error())
return reconcile.Result{}, errors.Wrapf(err, "failed to list ObjectBucketClaims for CephBucketNotification %q", bnName)
}
if len(obcList.Items) == 0 {
Expand All @@ -172,19 +178,25 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
// loop through all OBCs in the list and get their OBs
for _, obc := range obcList.Items {
if obc.Spec.ObjectBucketName == "" {
logger.Infof("ObjectBucketClaim %q resource did not create the bucket yet. will retry", types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace})
failureMessage := fmt.Sprintf("Reconcile failed for %s, ObjectBucketClaim %q did not create the bucket yet",
bnName, types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace})
logger.Info(failureMessage)
r.recorder.Event(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), failureMessage)
return waitForRequeueIfObjectBucketNotReady, nil
}
ob := bktv1alpha1.ObjectBucket{}
bucketName := types.NamespacedName{Namespace: namespace, Name: obc.Spec.ObjectBucketName}
if err := r.client.Get(r.opManagerContext, bucketName, &ob); err != nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s with error %s", bnName, err.Error())
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve ObjectBucket %v", bucketName)
}
objectStoreName, err := getCephObjectStoreName(ob)
if err != nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s with error %s", bnName, err.Error())
return reconcile.Result{}, errors.Wrapf(err, "failed to get object store from ObjectBucket %q", bucketName)
}
if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s with error %s", bnName, err.Error())
return reconcile.Result{}, err
}

Expand All @@ -202,6 +214,7 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
notification,
)
if err != nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s with error %s", bnName, err.Error())
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName)
}
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.Provisioned), "Notification %q provisioned on bucket %q", bnName, bucketName)
Expand Down
22 changes: 17 additions & 5 deletions pkg/operator/ceph/object/notification/controller_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -56,6 +57,7 @@ var (
bnName = types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}
startEvent = fmt.Sprintf("%s %s Start reconciling %q", v1.EventTypeNormal, string(cephv1.ReconcileStarted), bnName)
finishedEvent = fmt.Sprintf("%s %s Successfully reconciled %q", v1.EventTypeNormal, string(cephv1.ReconcileSucceeded), bnName)
failedEvent = fmt.Sprintf("%s %s Reconcile failed for %s", v1.EventTypeNormal, string(cephv1.ReconcileFailed), bnName)
provisionedEvent = fmt.Sprintf("%s %s Notification %q provisioned on bucket %q",
v1.EventTypeNormal, string(cephv1.Provisioned), bnName, types.NamespacedName{Namespace: testNamespace, Name: testBucketName})
)
Expand Down Expand Up @@ -162,6 +164,16 @@ func verifyEvents(t *testing.T, expectedEvents []string) {
for _, expectedEvent := range expectedEvents {
select {
case event := <-testRecorder.Events:
if expectedEvent != "END" {
splitEvent := strings.Split(expectedEvent, " ")
// the event message must have at least 3 parts
assert.GreaterOrEqual(t, len(splitEvent), 3)
if splitEvent[1] == string(cephv1.ReconcileFailed) {
// for failure events we don't verify the actual error message
assert.True(t, strings.HasPrefix(event, failedEvent))
continue
}
}
assert.Equal(t, expectedEvent, event)
case <-time.After(1 * time.Second):
assert.Failf(t, "missing event", "missing event: \"%s\"", expectedEvent)
Expand Down Expand Up @@ -229,7 +241,7 @@ func TestCephBucketNotificationController(t *testing.T) {
assert.True(t, res.Requeue)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
verifyEvents(t, []string{startEvent})
verifyEvents(t, []string{startEvent, failedEvent})
})

t.Run("create notification and topic configuration when there is no cluster", func(t *testing.T) {
Expand All @@ -244,7 +256,7 @@ func TestCephBucketNotificationController(t *testing.T) {
assert.True(t, res.Requeue)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
verifyEvents(t, []string{startEvent})
verifyEvents(t, []string{startEvent, failedEvent})
})

t.Run("create notification and topic configuration cluster is not ready", func(t *testing.T) {
Expand All @@ -260,7 +272,7 @@ func TestCephBucketNotificationController(t *testing.T) {
assert.True(t, res.Requeue)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
verifyEvents(t, []string{startEvent})
verifyEvents(t, []string{startEvent, failedEvent})
})

t.Run("create notification and topic configuration when topic is not yet provisioned", func(t *testing.T) {
Expand All @@ -277,7 +289,7 @@ func TestCephBucketNotificationController(t *testing.T) {
assert.NoError(t, err)
assert.True(t, res.Requeue)
assert.False(t, createWasInvoked)
verifyEvents(t, []string{startEvent})
verifyEvents(t, []string{startEvent, failedEvent})
})

t.Run("create notification and topic configuration", func(t *testing.T) {
Expand Down Expand Up @@ -377,7 +389,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) {
assert.NoError(t, err)
assert.True(t, res.Requeue)
assert.False(t, createWasInvoked)
verifyEvents(t, []string{startEvent})
verifyEvents(t, []string{startEvent, failedEvent})
})

t.Run("provision notification when OB exists", func(t *testing.T) {
Expand Down
Expand Up @@ -19,6 +19,7 @@ package notification

import (
"context"
"fmt"
"strings"

bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1"
Expand Down Expand Up @@ -170,17 +171,21 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
failureMessage := fmt.Sprintf("Reconcile failed for %s, topic %q not provisioned yet", bnName, topicName)
logger.Info(failureMessage)
r.recorder.Event(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), failureMessage)
return waitForRequeueIfTopicNotReady, nil
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s with error %s", bnName, err.Error())
return reconcile.Result{}, err
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileFailed), "Reconcile failed for %s with error %s", bnName, err.Error())
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
logger.Infof("provisioned CephBucketNotification %q", bnName)
Expand Down
Expand Up @@ -173,7 +173,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) {
assert.NoError(t, err)
assert.True(t, res.Requeue)
assert.False(t, createWasInvoked)
verifyEvents(t, []string{startEvent})
verifyEvents(t, []string{startEvent, failedEvent})
})

t.Run("provision OBC with notification label", func(t *testing.T) {
Expand Down

0 comments on commit f5acb87

Please sign in to comment.