From f1dee39262bf26e88b80943dfa05e17cf9f9bfe8 Mon Sep 17 00:00:00 2001 From: Jiffin Tony Thottan Date: Wed, 1 Dec 2021 17:49:40 +0530 Subject: [PATCH] object: fix handling for notifications for OBC Current approach is to delete all the existing notifcations from bucket and re-add the notification from the labels. Signed-off-by: Jiffin Tony Thottan --- .../object/notification/controller_test.go | 20 +++- .../notification/obc_label_controller.go | 91 +++++++++++++------ .../notification/obc_label_controller_test.go | 60 ++++++++++++ .../ceph/object/notification/provisioner.go | 40 +++++++- .../ceph/object/notification/s3ext.go | 9 +- 5 files changed, 182 insertions(+), 38 deletions(-) diff --git a/pkg/operator/ceph/object/notification/controller_test.go b/pkg/operator/ceph/object/notification/controller_test.go index 0859a2fe9b563..d8f0460d02051 100644 --- a/pkg/operator/ceph/object/notification/controller_test.go +++ b/pkg/operator/ceph/object/notification/controller_test.go @@ -52,12 +52,15 @@ var ( testSCName = "my-storage-class" ) -var createWasInvoked bool +var createWasInvoked, getWasInvoked, deleteWasInvoked bool func mockCleanup() { createWasInvoked = false + getWasInvoked = false + deleteWasInvoked = false createNotificationFunc = createNotification - deleteAllNotificationsFunc = deleteAllNotifications + getAllNotificationsFunc = getAllNotifications + deleteNotificationFunc = deleteNotification } func mockSetup() { @@ -66,7 +69,18 @@ func mockSetup() { createWasInvoked = true return nil } - deleteAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error { + getAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) ([]string, error) { + getWasInvoked = true + if bucket.Name == "delete" { + return []string{"delete" + testNotificationName}, nil + } + if bucket.Name == "nochange" { + return []string{testNotificationName}, nil + } + return nil, nil + } + deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error { + deleteWasInvoked = true return nil } } diff --git a/pkg/operator/ceph/object/notification/obc_label_controller.go b/pkg/operator/ceph/object/notification/obc_label_controller.go index 15f20a4c1af7e..2a37f90927634 100644 --- a/pkg/operator/ceph/object/notification/obc_label_controller.go +++ b/pkg/operator/ceph/object/notification/obc_label_controller.go @@ -127,7 +127,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res return opcontroller.WaitForRequeueIfCephClusterNotReady, nil } - // delete all existing notifications + // get all existing notifications p := provisioner{ context: r.context, clusterInfo: clusterInfo, @@ -136,11 +136,14 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res owner: ob.Spec.AdditionalState[bucket.CephUser], objectStoreName: objectStoreName, } - err = deleteAllNotificationsFunc(p, &ob) + bnList, err := getAllNotificationsFunc(p, &ob) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed delete all bucket notifications from ObjectbucketClaim %q", bucketName) } + labelList := make([]string, 0) + addList := make([]string, 0) + deleteList := make([]string, 0) // looking for notifications in the labels for labelKey, labelValue := range obc.Labels { notifyLabels := strings.SplitAfterN(labelKey, notificationLabelPrefix, 2) @@ -149,36 +152,70 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res logger.Warningf("bucket notification label mismatch. ignoring key %q value %q", labelKey, labelValue) continue } - // for each notification label fetch the bucket notification CRD - logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName) - notification := &cephv1.CephBucketNotification{} - bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue} - if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil { - if kerrors.IsNotFound(err) { - logger.Infof("CephBucketNotification %q not found", bnName) - return waitForRequeueIfNotificationNotReady, nil + labelList = append(labelList, labelValue) + found := false + for _, existingValue := range bnList { + if labelValue == existingValue { + found = true + break } - return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) } - - // get the topic associated with the notification, and make sure it is provisioned - topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} - bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) - if err != nil { - logger.Infof("CephBucketTopic %q not provisioned yet", topicName) - return waitForRequeueIfTopicNotReady, nil + // add it into new list for creating notifications for the bucket + if !found { + addList = append(addList, labelValue) } - - if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { - return reconcile.Result{}, err + } + } + // remove notifications which are no longer specified in the OBC labels + for _, oldValue := range bnList { + found := false + for _, newValue := range labelList { + if oldValue == newValue { + found = true + break } - - // provision the notification - err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification) - if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName) + } + if !found { + deleteList = append(deleteList, oldValue) + } + } + for _, labelValue := range addList { + // for each notification label fetch the bucket notification CRD + logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName) + notification := &cephv1.CephBucketNotification{} + bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue} + if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil { + if kerrors.IsNotFound(err) { + logger.Infof("CephBucketNotification %q not found", bnName) + return waitForRequeueIfNotificationNotReady, nil } - logger.Infof("provisioned CephBucketNotification %q", bnName) + return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) + } + + // get the topic associated with the notification, and make sure it is provisioned + topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} + bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) + if err != nil { + logger.Infof("CephBucketTopic %q not provisioned yet", topicName) + return waitForRequeueIfTopicNotReady, nil + } + + if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { + return reconcile.Result{}, err + } + + // provision the notification + err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName) + } + logger.Infof("provisioned CephBucketNotification %q", bnName) + } + + for _, notificationId := range deleteList { + err = deleteNotificationFunc(p, &ob, notificationId) + if err != nil { + logger.Warningf("notification %q failed remove from %q, returned error %v", notificationId, ob.Spec.Endpoint.BucketName, err) } } diff --git a/pkg/operator/ceph/object/notification/obc_label_controller_test.go b/pkg/operator/ceph/object/notification/obc_label_controller_test.go index c7fbaf3620b29..5663c5c8fe419 100644 --- a/pkg/operator/ceph/object/notification/obc_label_controller_test.go +++ b/pkg/operator/ceph/object/notification/obc_label_controller_test.go @@ -189,7 +189,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) + assert.False(t, getWasInvoked) assert.False(t, createWasInvoked) + assert.False(t, deleteWasInvoked) }) t.Run("provision OBC with notification label with not ready ob", func(t *testing.T) { @@ -206,7 +208,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) + assert.False(t, getWasInvoked) assert.False(t, createWasInvoked) + assert.False(t, deleteWasInvoked) }) obc.Spec.ObjectBucketName = testBucketName @@ -226,7 +230,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) + assert.True(t, getWasInvoked) assert.False(t, createWasInvoked) + assert.False(t, deleteWasInvoked) }) t.Run("provision OBC with notification label and notification with no topic", func(t *testing.T) { @@ -244,7 +250,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) + assert.True(t, getWasInvoked) assert.False(t, createWasInvoked) + assert.False(t, deleteWasInvoked) }) t.Run("provision OBC with notification label", func(t *testing.T) { @@ -263,6 +271,58 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) assert.True(t, createWasInvoked) + assert.False(t, deleteWasInvoked) + }) + + t.Run("reconcile with already exisitng label for the obc", func(t *testing.T) { + nochangeOB := ob.DeepCopy() + nochangeOB.Name = "nochange" + nochangeOBC := obc.DeepCopy() + nochangeOBC.Name = "nochange" + objects := []runtime.Object{ + cephCluster, + nochangeOBC, + nochangeOB, + bucketNotification, + bucketTopic, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.False(t, createWasInvoked) + assert.False(t, deleteWasInvoked) + }) + + t.Run("delete notification for the obc", func(t *testing.T) { + deleteOB := ob.DeepCopy() + deleteOB.Name = "delete" + deleteOBC := obc.DeepCopy() + deleteOBC.Name = "delete" + objects := []runtime.Object{ + cephCluster, + deleteOBC, + deleteOB, + bucketNotification, + bucketTopic, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.False(t, createWasInvoked) + assert.True(t, deleteWasInvoked) }) } diff --git a/pkg/operator/ceph/object/notification/provisioner.go b/pkg/operator/ceph/object/notification/provisioner.go index 5e9c6a3c9e379..be856a5e1b536 100644 --- a/pkg/operator/ceph/object/notification/provisioner.go +++ b/pkg/operator/ceph/object/notification/provisioner.go @@ -30,6 +30,7 @@ import ( "github.com/rook/rook/pkg/clusterd" cephclient "github.com/rook/rook/pkg/daemon/ceph/client" "github.com/rook/rook/pkg/operator/ceph/object" + "github.com/rook/rook/pkg/operator/ceph/object/bucket" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -166,9 +167,38 @@ var createNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, t } // Allow overriding this function for unit tests -var deleteAllNotificationsFunc = deleteAllNotifications +var getAllNotificationsFunc = getAllNotifications -var deleteAllNotifications = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error { +var getAllNotifications = func(p provisioner, ob *bktv1alpha1.ObjectBucket) ([]string, error) { + bucketName := ob.Spec.Endpoint.BucketName + ownerName := ob.Spec.AdditionalState[bucket.CephUser] + s3Agent, err := newS3Agent(p) + if err != nil { + return nil, errors.Wrapf(err, "failed to create S3 agent for CephBucketNotification provisioning for bucket %q", bucketName) + } + nc, err := s3Agent.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{ + Bucket: &bucketName, + ExpectedBucketOwner: &ownerName, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to get BucketNotification from bucket %q", bucketName) + } + + notificationList := make([]string, 0) + if nc != nil { + for _, tc := range nc.TopicConfigurations { + notificationList = append(notificationList, *tc.Id) + } + } + logger.Debugf("Bucket Notifications %q was listed for bucket %q", notificationList, bucketName) + + return notificationList, nil +} + +// Allow overriding this function for unit tests +var deleteNotificationFunc = deleteNotification + +var deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error { bucketName := types.NamespacedName{Namespace: bucket.Namespace, Name: bucket.Name} s3Agent, err := newS3Agent(p) if err != nil { @@ -176,11 +206,11 @@ var deleteAllNotifications = func(p provisioner, bucket *bktv1alpha1.ObjectBucke } if err := DeleteBucketNotification(s3Agent.Client, &DeleteBucketNotificationRequestInput{ Bucket: &bucket.Spec.Endpoint.BucketName, - }); err != nil { - return errors.Wrapf(err, "failed to delete all bucket notifications from bucket %q", bucketName) + }, notificationId); err != nil { + return errors.Wrapf(err, "failed to delete bucket notification %q from bucket %q", notificationId, bucketName) } - logger.Infof("all bucket notifications deleted from bucket %q", bucketName) + logger.Debugf("bucket notification %q deleted from bucket %q", notificationId, bucketName) return nil } diff --git a/pkg/operator/ceph/object/notification/s3ext.go b/pkg/operator/ceph/object/notification/s3ext.go index 7de5ce05f9b62..a481b1a1589a7 100644 --- a/pkg/operator/ceph/object/notification/s3ext.go +++ b/pkg/operator/ceph/object/notification/s3ext.go @@ -66,13 +66,16 @@ func (s *DeleteBucketNotificationRequestInput) Validate() error { const opDeleteBucketNotification = "DeleteBucketNotification" -func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRequestInput) *request.Request { +func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRequestInput, notificationId string) *request.Request { op := &request.Operation{ Name: opDeleteBucketNotification, HTTPMethod: http.MethodDelete, HTTPPath: "/{Bucket}?notification", } + if len(notificationId) > 0 { + op.HTTPPath = "/{Bucket}?notification=" + notificationId + } if input == nil { input = &DeleteBucketNotificationRequestInput{} } @@ -80,7 +83,7 @@ func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRe return c.NewRequest(op, input, nil) } -func DeleteBucketNotification(c *s3.S3, input *DeleteBucketNotificationRequestInput) error { - req := DeleteBucketNotificationRequest(c, input) +func DeleteBucketNotification(c *s3.S3, input *DeleteBucketNotificationRequestInput, notificationId string) error { + req := DeleteBucketNotificationRequest(c, input, notificationId) return req.Send() }