Skip to content

Commit

Permalink
object: fix handling for notifications for OBC
Browse files Browse the repository at this point in the history
Current approach is to delete all the existing notifcations from bucket
and re-add the notification from the labels.

Signed-off-by: Jiffin Tony Thottan <thottanjiffin@gmail.com>
  • Loading branch information
thotz committed Dec 14, 2021
1 parent d40e6d7 commit 4e8f783
Show file tree
Hide file tree
Showing 5 changed files with 388 additions and 113 deletions.
86 changes: 64 additions & 22 deletions pkg/operator/ceph/object/notification/controller_test.go
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/coreos/pkg/capnslog"
bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1"
"github.com/pkg/errors"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake"
"github.com/rook/rook/pkg/operator/test"
Expand All @@ -43,31 +44,72 @@ import (
)

var (
testTopicName = "topic-a"
testNotificationName = "notification-a"
testNamespace = "rook-ceph"
testStoreName = "test-store"
testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName
testBucketName = "my-bucket"
testSCName = "my-storage-class"
testTopicName = "topic-a"
testNotificationName = "notification"
testNamespace = "rook-ceph"
testStoreName = "test-store"
testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName
testBucketName = "my-bucket"
testSCName = "my-storage-class"
deleteBucketName = "delete"
noChangeBucketName = "nochange"
multipleCreateBucketName = "multi-create"
multipleDeleteBucketName = "multi-delete"
multipleBothBucketName = "multi-both"
)

var createWasInvoked bool
var getWasInvoked bool
var createNotificationCount, deleteNotificationCount int

func resetValues() {
getWasInvoked = false
createNotificationCount = 0
deleteNotificationCount = 0
}
func mockCleanup() {
createWasInvoked = false
resetValues()
createNotificationFunc = createNotification
deleteAllNotificationsFunc = deleteAllNotifications
getAllNotificationsFunc = getAllRGWNotifications
deleteNotificationFunc = deleteNotification
}

func mockSetup() {
createWasInvoked = false
createNotificationFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, topicARN string, notification *cephv1.CephBucketNotification) error {
createWasInvoked = true
return nil
createNotificationCount++
if bucket.Name == testBucketName && notification.Name == testNotificationName {
return nil
}
if (bucket.Name == multipleCreateBucketName || bucket.Name == multipleBothBucketName) &&
(notification.Name == testNotificationName+"-1" || notification.Name == testNotificationName+"-2") {
return nil
}
return errors.New("Invalid notification for create call")
}
deleteAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error {
return nil
getAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) ([]string, error) {
getWasInvoked = true
if bucket.Name == deleteBucketName {
return []string{deleteBucketName + testNotificationName}, nil
}
if bucket.Name == noChangeBucketName {
return []string{testNotificationName}, nil
}
if bucket.Name == multipleDeleteBucketName || bucket.Name == multipleBothBucketName {
return []string{multipleDeleteBucketName + testNotificationName + "-1",
multipleDeleteBucketName + testNotificationName + "-2"}, nil
}
return nil, nil
}
deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error {
deleteNotificationCount++
if bucket.Name == deleteBucketName && notificationId == deleteBucketName+testNotificationName {
return nil
}
if (bucket.Name == multipleDeleteBucketName || bucket.Name == multipleBothBucketName) &&
(notificationId == multipleDeleteBucketName+testNotificationName+"-1" ||
notificationId == multipleDeleteBucketName+testNotificationName+"-2") {
return nil
}
return errors.New("Invalid notification name for delete call")
}
}

Expand Down Expand Up @@ -181,7 +223,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, createNotificationCount)
})

t.Run("create notification and topic configuration when there is no cluster", func(t *testing.T) {
Expand All @@ -201,7 +243,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, createNotificationCount)
})

t.Run("create notification and topic configuration cluster is not ready", func(t *testing.T) {
Expand All @@ -222,7 +264,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, createNotificationCount)
})

t.Run("create notification and topic configuration when topic is not yet provisioned", func(t *testing.T) {
Expand All @@ -245,7 +287,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, createNotificationCount)
})

t.Run("create notification and topic configuration", func(t *testing.T) {
Expand All @@ -268,7 +310,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, createNotificationCount)
})
}

Expand Down Expand Up @@ -404,7 +446,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, createNotificationCount)
})

t.Run("provision notification when OB exists", func(t *testing.T) {
Expand Down Expand Up @@ -448,7 +490,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.True(t, createWasInvoked)
assert.Equal(t, 1, createNotificationCount)
})
}

Expand Down
85 changes: 58 additions & 27 deletions pkg/operator/ceph/object/notification/obc_label_controller.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rook/rook/pkg/operator/ceph/object/topic"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
return opcontroller.WaitForRequeueIfCephClusterNotReady, nil
}

// delete all existing notifications
// get all existing notifications
p := provisioner{
context: r.context,
clusterInfo: clusterInfo,
Expand All @@ -136,11 +137,14 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
owner: ob.Spec.AdditionalState[bucket.CephUser],
objectStoreName: objectStoreName,
}
err = deleteAllNotificationsFunc(p, &ob)
bnList, err := getAllNotificationsFunc(p, &ob)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed delete all bucket notifications from ObjectbucketClaim %q", bucketName)
}

labelList := make([]string, 0)
addList := make([]string, 0)
deleteList := make([]string, 0)
// looking for notifications in the labels
for labelKey, labelValue := range obc.Labels {
notifyLabels := strings.SplitAfterN(labelKey, notificationLabelPrefix, 2)
Expand All @@ -149,37 +153,64 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
logger.Warningf("bucket notification label mismatch. ignoring key %q value %q", labelKey, labelValue)
continue
}
// for each notification label fetch the bucket notification CRD
logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName)
notification := &cephv1.CephBucketNotification{}
bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not found", bnName)
return waitForRequeueIfNotificationNotReady, nil
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
labelList = append(labelList, labelValue)
// add it into new list for creating notifications for the bucket
if !sets.NewString(bnList...).Has(labelValue) {
addList = append(addList, labelValue)
}
logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName)
}
}

// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
}
// remove notifications which are no longer specified in the OBC labels
for _, oldValue := range bnList {
if !sets.NewString(labelList...).Has(oldValue) {
deleteList = append(deleteList, oldValue)
}
}
for _, notificationId := range deleteList {
err = deleteNotificationFunc(p, &ob, notificationId)
if err != nil {
logger.Warningf("notification %q failed remove from %q, returned error %v", notificationId, ob.Spec.Endpoint.BucketName, err)
}
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
}
// add new notifications to the list
return r.addNewNotifications(p, ob, addList, objectStoreName, obc.Namespace)
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
func (r *ReconcileOBCLabels) addNewNotifications(p provisioner, ob bktv1alpha1.ObjectBucket, notificationList []string, objectStoreName types.NamespacedName, namespace string) (reconcile.Result, error) {
for _, labelValue := range notificationList {
logger.Debugf("adding bucket notification %q", labelValue)
// for each notification label fetch the bucket notification CRD
notification := &cephv1.CephBucketNotification{}
bnName := types.NamespacedName{Namespace: namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not found", bnName)
return waitForRequeueIfNotificationNotReady, nil
}
logger.Infof("provisioned CephBucketNotification %q", bnName)
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}

// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
logger.Infof("provisioned CephBucketNotification %q", bnName)
}

return reconcile.Result{}, nil
Expand Down

0 comments on commit 4e8f783

Please sign in to comment.