Skip to content

Commit

Permalink
object: fix handling for notifications for OBC
Browse files Browse the repository at this point in the history
Current approach is to delete all the existing notifcations from bucket
and re-add the notification from the labels.

Signed-off-by: Jiffin Tony Thottan <thottanjiffin@gmail.com>
  • Loading branch information
thotz committed Dec 17, 2021
1 parent d40e6d7 commit 9d9acc2
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 111 deletions.
1 change: 1 addition & 0 deletions pkg/operator/ceph/object/notification/controller.go
Expand Up @@ -52,6 +52,7 @@ var logger = capnslog.NewPackageLogger("github.com/rook/rook", packageName)
var waitForRequeueIfTopicNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
var waitForRequeueIfNotificationNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
var waitForRequeueIfObjectBucketNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
var waitForRequeueIfNotificationNotDeleted = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}

// ReconcileNotifications reconciles a CephbucketNotification
type ReconcileNotifications struct {
Expand Down
68 changes: 48 additions & 20 deletions pkg/operator/ceph/object/notification/controller_test.go
Expand Up @@ -43,30 +43,58 @@ import (
)

var (
testTopicName = "topic-a"
testNotificationName = "notification-a"
testNamespace = "rook-ceph"
testStoreName = "test-store"
testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName
testBucketName = "my-bucket"
testSCName = "my-storage-class"
testTopicName = "topic-a"
testNotificationName = "notification"
testNamespace = "rook-ceph"
testStoreName = "test-store"
testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName
testBucketName = "my-bucket"
testSCName = "my-storage-class"
deleteBucketName = "delete"
noChangeBucketName = "nochange"
multipleCreateBucketName = "multi-create"
multipleDeleteBucketName = "multi-delete"
multipleBothBucketName = "multi-both"
)

var createWasInvoked bool
var getWasInvoked bool
var createdNotifications []string
var deletedNotifications []string

func resetValues() {
getWasInvoked = false
createdNotifications = nil
deletedNotifications = nil

}
func mockCleanup() {
createWasInvoked = false
resetValues()
createNotificationFunc = createNotification
deleteAllNotificationsFunc = deleteAllNotifications
getAllNotificationsFunc = getAllRGWNotifications
deleteNotificationFunc = deleteNotification
}

func mockSetup() {
createWasInvoked = false
createNotificationFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, topicARN string, notification *cephv1.CephBucketNotification) error {
createWasInvoked = true
createdNotifications = append(createdNotifications, notification.Name)
return nil
}
deleteAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error {
getAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) ([]string, error) {
getWasInvoked = true
if bucket.Name == deleteBucketName {
return []string{deleteBucketName + testNotificationName}, nil
}
if bucket.Name == noChangeBucketName {
return []string{testNotificationName}, nil
}
if bucket.Name == multipleDeleteBucketName || bucket.Name == multipleBothBucketName {
return []string{multipleDeleteBucketName + testNotificationName + "-1",
multipleDeleteBucketName + testNotificationName + "-2"}, nil
}
return nil, nil
}
deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error {
deletedNotifications = append(deletedNotifications, notificationId)
return nil
}
}
Expand Down Expand Up @@ -181,7 +209,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("create notification and topic configuration when there is no cluster", func(t *testing.T) {
Expand All @@ -201,7 +229,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("create notification and topic configuration cluster is not ready", func(t *testing.T) {
Expand All @@ -222,7 +250,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("create notification and topic configuration when topic is not yet provisioned", func(t *testing.T) {
Expand All @@ -245,7 +273,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("create notification and topic configuration", func(t *testing.T) {
Expand All @@ -268,7 +296,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})
}

Expand Down Expand Up @@ -404,7 +432,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("provision notification when OB exists", func(t *testing.T) {
Expand Down Expand Up @@ -448,7 +476,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.True(t, createWasInvoked)
assert.Equal(t, 1, len(createdNotifications))
})
}

Expand Down
84 changes: 57 additions & 27 deletions pkg/operator/ceph/object/notification/obc_label_controller.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rook/rook/pkg/operator/ceph/object/topic"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
return opcontroller.WaitForRequeueIfCephClusterNotReady, nil
}

// delete all existing notifications
// get all existing notifications
p := provisioner{
context: r.context,
clusterInfo: clusterInfo,
Expand All @@ -136,11 +137,13 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
owner: ob.Spec.AdditionalState[bucket.CephUser],
objectStoreName: objectStoreName,
}
err = deleteAllNotificationsFunc(p, &ob)
bnList, err := getAllNotificationsFunc(p, &ob)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed delete all bucket notifications from ObjectbucketClaim %q", bucketName)
}

labelList := make([]string, 0)
deleteList := make([]string, 0)
// looking for notifications in the labels
for labelKey, labelValue := range obc.Labels {
notifyLabels := strings.SplitAfterN(labelKey, notificationLabelPrefix, 2)
Expand All @@ -149,37 +152,64 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
logger.Warningf("bucket notification label mismatch. ignoring key %q value %q", labelKey, labelValue)
continue
}
// for each notification label fetch the bucket notification CRD
labelList = append(labelList, labelValue)
logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName)
notification := &cephv1.CephBucketNotification{}
bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not found", bnName)
return waitForRequeueIfNotificationNotReady, nil
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}
}
}

// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
}
// remove notifications which are no longer specified in the OBC labels
for _, oldValue := range bnList {
if !sets.NewString(labelList...).Has(oldValue) {
deleteList = append(deleteList, oldValue)
}
}
retry := false
for _, notificationId := range deleteList {
err = deleteNotificationFunc(p, &ob, notificationId)
if err != nil {
logger.Errorf("notification %q failed remove from %q, returned error %v", notificationId, ob.Spec.Endpoint.BucketName, err)
retry = true
}
}
if retry {
return waitForRequeueIfNotificationNotDeleted, nil
}
// add new notifications to the list
return r.addNewNotifications(p, ob, labelList, objectStoreName, obc.Namespace)
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
func (r *ReconcileOBCLabels) addNewNotifications(p provisioner, ob bktv1alpha1.ObjectBucket, notificationList []string, objectStoreName types.NamespacedName, namespace string) (reconcile.Result, error) {
for _, labelValue := range notificationList {
// for each notification label fetch the bucket notification CRD
notification := &cephv1.CephBucketNotification{}
bnName := types.NamespacedName{Namespace: namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not found in %q ", bnName.Name, bnName.Namespace)
return waitForRequeueIfNotificationNotReady, nil
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
logger.Infof("provisioned CephBucketNotification %q", bnName)
logger.Debugf("adding bucket notification %q in namespace %q to obc %q", bnName.Name, bnName.Namespace, ob.Spec.ClaimRef.Name)
// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet in %q", topicName.Name, topicName.Namespace)
return waitForRequeueIfTopicNotReady, nil
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
logger.Infof("provisioned CephBucketNotification %q in namespace %q to obc %q", bnName.Name, bnName.Namespace, ob.Spec.ClaimRef.Name)
}

return reconcile.Result{}, nil
Expand Down

0 comments on commit 9d9acc2

Please sign in to comment.