From 58dc4121ed7d8f80d2b01f2d5418f9b3b4307368 Mon Sep 17 00:00:00 2001 From: Jiffin Tony Thottan Date: Wed, 1 Dec 2021 17:49:40 +0530 Subject: [PATCH] object: fix handling for notifications for OBC Current approach is to delete all the existing notifcations from bucket and re-add the notification from the labels. Signed-off-by: Jiffin Tony Thottan --- .../object/notification/controller_test.go | 86 ++++-- .../notification/obc_label_controller.go | 85 ++++-- .../notification/obc_label_controller_test.go | 281 ++++++++++++++---- .../ceph/object/notification/provisioner.go | 40 ++- .../ceph/object/notification/s3ext.go | 9 +- 5 files changed, 388 insertions(+), 113 deletions(-) diff --git a/pkg/operator/ceph/object/notification/controller_test.go b/pkg/operator/ceph/object/notification/controller_test.go index 0859a2fe9b563..303b68233d753 100644 --- a/pkg/operator/ceph/object/notification/controller_test.go +++ b/pkg/operator/ceph/object/notification/controller_test.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/pkg/capnslog" bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" + "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" "github.com/rook/rook/pkg/operator/test" @@ -43,31 +44,72 @@ import ( ) var ( - testTopicName = "topic-a" - testNotificationName = "notification-a" - testNamespace = "rook-ceph" - testStoreName = "test-store" - testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName - testBucketName = "my-bucket" - testSCName = "my-storage-class" + testTopicName = "topic-a" + testNotificationName = "notification" + testNamespace = "rook-ceph" + testStoreName = "test-store" + testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName + testBucketName = "my-bucket" + testSCName = "my-storage-class" + deleteBucketName = "delete" + noChangeBucketName = "nochange" + multipleCreateBucketName = "multi-create" + multipleDeleteBucketName = "multi-delete" + multipleBothBucketName = "multi-both" ) -var createWasInvoked bool +var getWasInvoked bool +var createNotificationCount, deleteNotificationCount int +func resetValues() { + getWasInvoked = false + createNotificationCount = 0 + deleteNotificationCount = 0 +} func mockCleanup() { - createWasInvoked = false + resetValues() createNotificationFunc = createNotification - deleteAllNotificationsFunc = deleteAllNotifications + getAllNotificationsFunc = getAllRGWNotifications + deleteNotificationFunc = deleteNotification } func mockSetup() { - createWasInvoked = false createNotificationFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, topicARN string, notification *cephv1.CephBucketNotification) error { - createWasInvoked = true - return nil + createNotificationCount++ + if bucket.Name == testBucketName && notification.Name == testNotificationName { + return nil + } + if (bucket.Name == multipleCreateBucketName || bucket.Name == multipleBothBucketName) && + (notification.Name == testNotificationName+"-1" || notification.Name == testNotificationName+"-2") { + return nil + } + return errors.New("Invalid notification for create call") } - deleteAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error { - return nil + getAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) ([]string, error) { + getWasInvoked = true + if bucket.Name == deleteBucketName { + return []string{deleteBucketName + testNotificationName}, nil + } + if bucket.Name == noChangeBucketName { + return []string{testNotificationName}, nil + } + if bucket.Name == multipleDeleteBucketName || bucket.Name == multipleBothBucketName { + return []string{multipleDeleteBucketName + testNotificationName + "-1", + multipleDeleteBucketName + testNotificationName + "-2"}, nil + } + return nil, nil + } + deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error { + deleteNotificationCount++ + if bucket.Name == deleteBucketName && notificationId == deleteBucketName+testNotificationName { + return nil + } + if (bucket.Name == multipleDeleteBucketName || bucket.Name == multipleBothBucketName) && + (notificationId == multipleDeleteBucketName+testNotificationName+"-1" || + notificationId == multipleDeleteBucketName+testNotificationName+"-2") { + return nil + } + return errors.New("Invalid notification name for delete call") } } @@ -181,7 +223,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, createNotificationCount) }) t.Run("create notification and topic configuration when there is no cluster", func(t *testing.T) { @@ -201,7 +243,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, createNotificationCount) }) t.Run("create notification and topic configuration cluster is not ready", func(t *testing.T) { @@ -222,7 +264,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, createNotificationCount) }) t.Run("create notification and topic configuration when topic is not yet provisioned", func(t *testing.T) { @@ -245,7 +287,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, createNotificationCount) }) t.Run("create notification and topic configuration", func(t *testing.T) { @@ -268,7 +310,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, createNotificationCount) }) } @@ -404,7 +446,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, createNotificationCount) }) t.Run("provision notification when OB exists", func(t *testing.T) { @@ -448,7 +490,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.True(t, createWasInvoked) + assert.Equal(t, 1, createNotificationCount) }) } diff --git a/pkg/operator/ceph/object/notification/obc_label_controller.go b/pkg/operator/ceph/object/notification/obc_label_controller.go index 15f20a4c1af7e..5a1bc29423d71 100644 --- a/pkg/operator/ceph/object/notification/obc_label_controller.go +++ b/pkg/operator/ceph/object/notification/obc_label_controller.go @@ -30,6 +30,7 @@ import ( "github.com/rook/rook/pkg/operator/ceph/object/topic" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -127,7 +128,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res return opcontroller.WaitForRequeueIfCephClusterNotReady, nil } - // delete all existing notifications + // get all existing notifications p := provisioner{ context: r.context, clusterInfo: clusterInfo, @@ -136,11 +137,14 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res owner: ob.Spec.AdditionalState[bucket.CephUser], objectStoreName: objectStoreName, } - err = deleteAllNotificationsFunc(p, &ob) + bnList, err := getAllNotificationsFunc(p, &ob) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed delete all bucket notifications from ObjectbucketClaim %q", bucketName) } + labelList := make([]string, 0) + addList := make([]string, 0) + deleteList := make([]string, 0) // looking for notifications in the labels for labelKey, labelValue := range obc.Labels { notifyLabels := strings.SplitAfterN(labelKey, notificationLabelPrefix, 2) @@ -149,37 +153,64 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res logger.Warningf("bucket notification label mismatch. ignoring key %q value %q", labelKey, labelValue) continue } - // for each notification label fetch the bucket notification CRD - logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName) - notification := &cephv1.CephBucketNotification{} - bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue} - if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil { - if kerrors.IsNotFound(err) { - logger.Infof("CephBucketNotification %q not found", bnName) - return waitForRequeueIfNotificationNotReady, nil - } - return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) + labelList = append(labelList, labelValue) + // add it into new list for creating notifications for the bucket + if !sets.NewString(bnList...).Has(labelValue) { + addList = append(addList, labelValue) } + logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName) + } + } - // get the topic associated with the notification, and make sure it is provisioned - topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} - bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) - if err != nil { - logger.Infof("CephBucketTopic %q not provisioned yet", topicName) - return waitForRequeueIfTopicNotReady, nil - } + // remove notifications which are no longer specified in the OBC labels + for _, oldValue := range bnList { + if !sets.NewString(labelList...).Has(oldValue) { + deleteList = append(deleteList, oldValue) + } + } + for _, notificationId := range deleteList { + err = deleteNotificationFunc(p, &ob, notificationId) + if err != nil { + logger.Warningf("notification %q failed remove from %q, returned error %v", notificationId, ob.Spec.Endpoint.BucketName, err) + } + } - if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { - return reconcile.Result{}, err - } + // add new notifications to the list + return r.addNewNotifications(p, ob, addList, objectStoreName, obc.Namespace) +} - // provision the notification - err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification) - if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName) +func (r *ReconcileOBCLabels) addNewNotifications(p provisioner, ob bktv1alpha1.ObjectBucket, notificationList []string, objectStoreName types.NamespacedName, namespace string) (reconcile.Result, error) { + for _, labelValue := range notificationList { + logger.Debugf("adding bucket notification %q", labelValue) + // for each notification label fetch the bucket notification CRD + notification := &cephv1.CephBucketNotification{} + bnName := types.NamespacedName{Namespace: namespace, Name: labelValue} + if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil { + if kerrors.IsNotFound(err) { + logger.Infof("CephBucketNotification %q not found", bnName) + return waitForRequeueIfNotificationNotReady, nil } - logger.Infof("provisioned CephBucketNotification %q", bnName) + return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) + } + + // get the topic associated with the notification, and make sure it is provisioned + topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} + bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) + if err != nil { + logger.Infof("CephBucketTopic %q not provisioned yet", topicName) + return waitForRequeueIfTopicNotReady, nil + } + + if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { + return reconcile.Result{}, err + } + + // provision the notification + err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName) } + logger.Infof("provisioned CephBucketNotification %q", bnName) } return reconcile.Result{}, nil diff --git a/pkg/operator/ceph/object/notification/obc_label_controller_test.go b/pkg/operator/ceph/object/notification/obc_label_controller_test.go index c7fbaf3620b29..6513ea481f535 100644 --- a/pkg/operator/ceph/object/notification/obc_label_controller_test.go +++ b/pkg/operator/ceph/object/notification/obc_label_controller_test.go @@ -42,6 +42,67 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) +func createOBResources(name string) (*bktv1alpha1.ObjectBucketClaim, *bktv1alpha1.ObjectBucket) { + return &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: testNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ObjectBucketClaim", + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + StorageClassName: testSCName, + GenerateBucketName: name, + }, + Status: bktv1alpha1.ObjectBucketClaimStatus{ + Phase: bktv1alpha1.ObjectBucketClaimStatusPhasePending, + }, + }, &bktv1alpha1.ObjectBucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: testNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ObjectBucket", + }, + Spec: bktv1alpha1.ObjectBucketSpec{ + StorageClassName: testSCName, + Connection: &bktv1alpha1.Connection{ + Endpoint: &bktv1alpha1.Endpoint{ + BucketHost: object.BuildDomainName(testStoreName, testNamespace), + }, + }, + }, + Status: bktv1alpha1.ObjectBucketStatus{ + Phase: bktv1alpha1.ObjectBucketStatusPhaseBound, + }, + } +} + +func createBucketNotification(name string) *cephv1.CephBucketNotification { + return &cephv1.CephBucketNotification{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: testNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "CephBucketNotification", + }, + Spec: cephv1.BucketNotificationSpec{ + Topic: testTopicName, + }, + } +} + +func setNotificationLabels(labelList []string) map[string]string { + var label = make(map[string]string) + for _, value := range labelList { + label[notificationLabelPrefix+value] = value + } + return label +} + func TestCephBucketNotificationOBCLabelController(t *testing.T) { mockSetup() defer mockCleanup() @@ -68,18 +129,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { }, Status: &cephv1.BucketTopicStatus{ARN: &testARN}, } - bucketNotification := &cephv1.CephBucketNotification{ - ObjectMeta: metav1.ObjectMeta{ - Name: testNotificationName, - Namespace: testNamespace, - }, - TypeMeta: metav1.TypeMeta{ - Kind: "CephBucketNotification", - }, - Spec: cephv1.BucketNotificationSpec{ - Topic: testTopicName, - }, - } + cephCluster := &cephv1.CephCluster{ ObjectMeta: metav1.ObjectMeta{ Name: testNamespace, @@ -92,45 +142,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { }, }, } - obc := &bktv1alpha1.ObjectBucketClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: testBucketName, - Namespace: testNamespace, - Labels: map[string]string{ - notificationLabelPrefix + testNotificationName: testNotificationName, - }, - }, - TypeMeta: metav1.TypeMeta{ - Kind: "ObjectBucketClaim", - }, - Spec: bktv1alpha1.ObjectBucketClaimSpec{ - StorageClassName: testSCName, - GenerateBucketName: testBucketName, - }, - Status: bktv1alpha1.ObjectBucketClaimStatus{ - Phase: bktv1alpha1.ObjectBucketClaimStatusPhasePending, - }, - } - ob := &bktv1alpha1.ObjectBucket{ - ObjectMeta: metav1.ObjectMeta{ - Name: testBucketName, - Namespace: testNamespace, - }, - TypeMeta: metav1.TypeMeta{ - Kind: "ObjectBucket", - }, - Spec: bktv1alpha1.ObjectBucketSpec{ - StorageClassName: testSCName, - Connection: &bktv1alpha1.Connection{ - Endpoint: &bktv1alpha1.Endpoint{ - BucketHost: object.BuildDomainName(testStoreName, testNamespace), - }, - }, - }, - Status: bktv1alpha1.ObjectBucketStatus{ - Phase: bktv1alpha1.ObjectBucketStatusPhaseBound, - }, - } + req := reconcile.Request{ NamespacedName: types.NamespacedName{ Name: testBucketName, @@ -173,10 +185,14 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { Type: k8sutil.RookType, } + obc, ob := createOBResources(testBucketName) + obc.Labels = setNotificationLabels([]string{testNotificationName}) + _, err := c.Clientset.CoreV1().Secrets(testNamespace).Create(ctx, secret, metav1.CreateOptions{}) assert.NoError(t, err) t.Run("provision OBC with notification label with no ob", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -189,10 +205,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) - assert.False(t, createWasInvoked) + assert.False(t, getWasInvoked) + assert.Equal(t, 0, createNotificationCount) + assert.Equal(t, 0, deleteNotificationCount) }) t.Run("provision OBC with notification label with not ready ob", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -206,13 +225,16 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) - assert.False(t, createWasInvoked) + assert.False(t, getWasInvoked) + assert.Equal(t, 0, createNotificationCount) + assert.Equal(t, 0, deleteNotificationCount) }) obc.Spec.ObjectBucketName = testBucketName obc.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound t.Run("provision OBC with notification label with no notification", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -226,10 +248,14 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) - assert.False(t, createWasInvoked) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, createNotificationCount) + assert.Equal(t, 0, deleteNotificationCount) }) + bucketNotification := createBucketNotification(testNotificationName) t.Run("provision OBC with notification label and notification with no topic", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -244,10 +270,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) - assert.False(t, createWasInvoked) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, createNotificationCount) + assert.Equal(t, 0, deleteNotificationCount) }) t.Run("provision OBC with notification label", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -263,6 +292,146 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.False(t, res.Requeue) - assert.True(t, createWasInvoked) + assert.True(t, getWasInvoked) + assert.Equal(t, 1, createNotificationCount) + assert.Equal(t, 0, deleteNotificationCount) + }) + + t.Run("reconcile with already existing label for the obc", func(t *testing.T) { + resetValues() + noChangeOBC, noChangeOB := createOBResources(noChangeBucketName) + noChangeOBC.Labels = setNotificationLabels([]string{testNotificationName}) + noChangeOBC.Spec.ObjectBucketName = noChangeBucketName + noChangeOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + req.NamespacedName.Name = noChangeBucketName + objects := []runtime.Object{ + cephCluster, + noChangeOBC, + noChangeOB, + bucketNotification, + bucketTopic, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, createNotificationCount) + assert.Equal(t, 0, deleteNotificationCount) + }) + + t.Run("delete notification from the obc", func(t *testing.T) { + resetValues() + deleteOBC, deleteOB := createOBResources(deleteBucketName) + deleteOBC.Spec.GenerateBucketName = deleteBucketName + deleteOBC.Spec.ObjectBucketName = deleteBucketName + deleteOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + req.NamespacedName.Name = deleteBucketName + objects := []runtime.Object{ + cephCluster, + deleteOBC, + deleteOB, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, createNotificationCount) + assert.Equal(t, 1, deleteNotificationCount) + }) + + t.Run("provision OBC with multiple notification labels", func(t *testing.T) { + resetValues() + multipleCreateOBC, multipleCreateOB := createOBResources(multipleCreateBucketName) + multipleCreateOBC.Labels = setNotificationLabels([]string{testNotificationName + "-1", testNotificationName + "-2"}) + multipleCreateOBC.Spec.ObjectBucketName = multipleCreateBucketName + multipleCreateOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + bucketNotification1 := createBucketNotification(testNotificationName + "-1") + bucketNotification2 := createBucketNotification(testNotificationName + "-2") + req.NamespacedName.Name = multipleCreateBucketName + + objects := []runtime.Object{ + cephCluster, + multipleCreateOBC, + multipleCreateOB, + bucketNotification1, + bucketNotification2, + bucketTopic, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 2, createNotificationCount) + assert.Equal(t, 0, deleteNotificationCount) + }) + + t.Run("delete multiple notifications from the obc", func(t *testing.T) { + resetValues() + multipleDeleteOBC, multipleDeleteOB := createOBResources(multipleDeleteBucketName) + multipleDeleteOBC.Spec.GenerateBucketName = multipleDeleteBucketName + multipleDeleteOBC.Spec.ObjectBucketName = multipleDeleteBucketName + multipleDeleteOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + req.NamespacedName.Name = multipleDeleteBucketName + objects := []runtime.Object{ + cephCluster, + multipleDeleteOBC, + multipleDeleteOB, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, createNotificationCount) + assert.Equal(t, 2, deleteNotificationCount) + }) + t.Run("provision OBC with multiple delete and create of notifications", func(t *testing.T) { + resetValues() + multipleBothOBC, multipleBothOB := createOBResources(multipleBothBucketName) + multipleBothOBC.Labels = setNotificationLabels([]string{testNotificationName + "-1", testNotificationName + "-2"}) + multipleBothOBC.Spec.ObjectBucketName = multipleBothBucketName + multipleBothOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + bucketNotification1 := createBucketNotification(testNotificationName + "-1") + bucketNotification2 := createBucketNotification(testNotificationName + "-2") + req.NamespacedName.Name = multipleBothBucketName + + objects := []runtime.Object{ + cephCluster, + multipleBothOBC, + multipleBothOB, + bucketNotification1, + bucketNotification2, + bucketTopic, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 2, createNotificationCount) + assert.Equal(t, 2, deleteNotificationCount) }) } diff --git a/pkg/operator/ceph/object/notification/provisioner.go b/pkg/operator/ceph/object/notification/provisioner.go index 5e9c6a3c9e379..5558085b4df4e 100644 --- a/pkg/operator/ceph/object/notification/provisioner.go +++ b/pkg/operator/ceph/object/notification/provisioner.go @@ -30,6 +30,7 @@ import ( "github.com/rook/rook/pkg/clusterd" cephclient "github.com/rook/rook/pkg/daemon/ceph/client" "github.com/rook/rook/pkg/operator/ceph/object" + "github.com/rook/rook/pkg/operator/ceph/object/bucket" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -166,9 +167,38 @@ var createNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, t } // Allow overriding this function for unit tests -var deleteAllNotificationsFunc = deleteAllNotifications +var getAllNotificationsFunc = getAllRGWNotifications -var deleteAllNotifications = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error { +var getAllRGWNotifications = func(p provisioner, ob *bktv1alpha1.ObjectBucket) ([]string, error) { + bucketName := ob.Spec.Endpoint.BucketName + ownerName := ob.Spec.AdditionalState[bucket.CephUser] + s3Agent, err := newS3Agent(p) + if err != nil { + return nil, errors.Wrapf(err, "failed to create S3 agent for CephBucketNotification provisioning for bucket %q", bucketName) + } + nc, err := s3Agent.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{ + Bucket: &bucketName, + ExpectedBucketOwner: &ownerName, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to get BucketNotification from bucket %q", bucketName) + } + + notificationList := make([]string, 0) + if nc != nil { + for _, tc := range nc.TopicConfigurations { + notificationList = append(notificationList, *tc.Id) + } + } + logger.Debugf("Bucket Notifications %q was listed for bucket %q", notificationList, bucketName) + + return notificationList, nil +} + +// Allow overriding this function for unit tests +var deleteNotificationFunc = deleteNotification + +var deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error { bucketName := types.NamespacedName{Namespace: bucket.Namespace, Name: bucket.Name} s3Agent, err := newS3Agent(p) if err != nil { @@ -176,11 +206,11 @@ var deleteAllNotifications = func(p provisioner, bucket *bktv1alpha1.ObjectBucke } if err := DeleteBucketNotification(s3Agent.Client, &DeleteBucketNotificationRequestInput{ Bucket: &bucket.Spec.Endpoint.BucketName, - }); err != nil { - return errors.Wrapf(err, "failed to delete all bucket notifications from bucket %q", bucketName) + }, notificationId); err != nil { + return errors.Wrapf(err, "failed to delete bucket notification %q from bucket %q", notificationId, bucketName) } - logger.Infof("all bucket notifications deleted from bucket %q", bucketName) + logger.Debugf("bucket notification %q deleted from bucket %q", notificationId, bucketName) return nil } diff --git a/pkg/operator/ceph/object/notification/s3ext.go b/pkg/operator/ceph/object/notification/s3ext.go index 7de5ce05f9b62..a481b1a1589a7 100644 --- a/pkg/operator/ceph/object/notification/s3ext.go +++ b/pkg/operator/ceph/object/notification/s3ext.go @@ -66,13 +66,16 @@ func (s *DeleteBucketNotificationRequestInput) Validate() error { const opDeleteBucketNotification = "DeleteBucketNotification" -func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRequestInput) *request.Request { +func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRequestInput, notificationId string) *request.Request { op := &request.Operation{ Name: opDeleteBucketNotification, HTTPMethod: http.MethodDelete, HTTPPath: "/{Bucket}?notification", } + if len(notificationId) > 0 { + op.HTTPPath = "/{Bucket}?notification=" + notificationId + } if input == nil { input = &DeleteBucketNotificationRequestInput{} } @@ -80,7 +83,7 @@ func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRe return c.NewRequest(op, input, nil) } -func DeleteBucketNotification(c *s3.S3, input *DeleteBucketNotificationRequestInput) error { - req := DeleteBucketNotificationRequest(c, input) +func DeleteBucketNotification(c *s3.S3, input *DeleteBucketNotificationRequestInput, notificationId string) error { + req := DeleteBucketNotificationRequest(c, input, notificationId) return req.Send() }