diff --git a/pkg/operator/ceph/object/notification/controller.go b/pkg/operator/ceph/object/notification/controller.go index 3e84a55a8309..60594a25e993 100644 --- a/pkg/operator/ceph/object/notification/controller.go +++ b/pkg/operator/ceph/object/notification/controller.go @@ -52,6 +52,7 @@ var logger = capnslog.NewPackageLogger("github.com/rook/rook", packageName) var waitForRequeueIfTopicNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second} var waitForRequeueIfNotificationNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second} var waitForRequeueIfObjectBucketNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second} +var waitForRequeueIfNotificationNotDeleted = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second} // ReconcileNotifications reconciles a CephbucketNotification type ReconcileNotifications struct { diff --git a/pkg/operator/ceph/object/notification/controller_test.go b/pkg/operator/ceph/object/notification/controller_test.go index 0859a2fe9b56..5c618a981ee1 100644 --- a/pkg/operator/ceph/object/notification/controller_test.go +++ b/pkg/operator/ceph/object/notification/controller_test.go @@ -43,30 +43,58 @@ import ( ) var ( - testTopicName = "topic-a" - testNotificationName = "notification-a" - testNamespace = "rook-ceph" - testStoreName = "test-store" - testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName - testBucketName = "my-bucket" - testSCName = "my-storage-class" + testTopicName = "topic-a" + testNotificationName = "notification" + testNamespace = "rook-ceph" + testStoreName = "test-store" + testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName + testBucketName = "my-bucket" + testSCName = "my-storage-class" + deleteBucketName = "delete" + noChangeBucketName = "nochange" + multipleCreateBucketName = "multi-create" + multipleDeleteBucketName = "multi-delete" + multipleBothBucketName = "multi-both" ) -var createWasInvoked bool +var getWasInvoked bool +var createdNotifications []string +var deletedNotifications []string +func resetValues() { + getWasInvoked = false + createdNotifications = nil + deletedNotifications = nil + +} func mockCleanup() { - createWasInvoked = false + resetValues() createNotificationFunc = createNotification - deleteAllNotificationsFunc = deleteAllNotifications + getAllNotificationsFunc = getAllRGWNotifications + deleteNotificationFunc = deleteNotification } func mockSetup() { - createWasInvoked = false createNotificationFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, topicARN string, notification *cephv1.CephBucketNotification) error { - createWasInvoked = true + createdNotifications = append(createdNotifications, notification.Name) return nil } - deleteAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error { + getAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) ([]string, error) { + getWasInvoked = true + if bucket.Name == deleteBucketName { + return []string{deleteBucketName + testNotificationName}, nil + } + if bucket.Name == noChangeBucketName { + return []string{testNotificationName}, nil + } + if bucket.Name == multipleDeleteBucketName || bucket.Name == multipleBothBucketName { + return []string{multipleDeleteBucketName + testNotificationName + "-1", + multipleDeleteBucketName + testNotificationName + "-2"}, nil + } + return nil, nil + } + deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error { + deletedNotifications = append(deletedNotifications, notificationId) return nil } } @@ -181,7 +209,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) }) t.Run("create notification and topic configuration when there is no cluster", func(t *testing.T) { @@ -201,7 +229,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) }) t.Run("create notification and topic configuration cluster is not ready", func(t *testing.T) { @@ -222,7 +250,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) }) t.Run("create notification and topic configuration when topic is not yet provisioned", func(t *testing.T) { @@ -245,7 +273,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) }) t.Run("create notification and topic configuration", func(t *testing.T) { @@ -268,7 +296,7 @@ func TestCephBucketNotificationController(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) }) } @@ -404,7 +432,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.False(t, createWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) }) t.Run("provision notification when OB exists", func(t *testing.T) { @@ -448,7 +476,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) { // notification configuration is set err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) - assert.True(t, createWasInvoked) + assert.Equal(t, 1, len(createdNotifications)) }) } diff --git a/pkg/operator/ceph/object/notification/obc_label_controller.go b/pkg/operator/ceph/object/notification/obc_label_controller.go index 15f20a4c1af7..61e229e1f40d 100644 --- a/pkg/operator/ceph/object/notification/obc_label_controller.go +++ b/pkg/operator/ceph/object/notification/obc_label_controller.go @@ -30,6 +30,7 @@ import ( "github.com/rook/rook/pkg/operator/ceph/object/topic" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -127,7 +128,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res return opcontroller.WaitForRequeueIfCephClusterNotReady, nil } - // delete all existing notifications + // get all existing notifications p := provisioner{ context: r.context, clusterInfo: clusterInfo, @@ -136,11 +137,13 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res owner: ob.Spec.AdditionalState[bucket.CephUser], objectStoreName: objectStoreName, } - err = deleteAllNotificationsFunc(p, &ob) + bnList, err := getAllNotificationsFunc(p, &ob) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "failed delete all bucket notifications from ObjectbucketClaim %q", bucketName) } + labelList := make([]string, 0) + deleteList := make([]string, 0) // looking for notifications in the labels for labelKey, labelValue := range obc.Labels { notifyLabels := strings.SplitAfterN(labelKey, notificationLabelPrefix, 2) @@ -149,37 +152,64 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res logger.Warningf("bucket notification label mismatch. ignoring key %q value %q", labelKey, labelValue) continue } - // for each notification label fetch the bucket notification CRD + labelList = append(labelList, labelValue) logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName) - notification := &cephv1.CephBucketNotification{} - bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue} - if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil { - if kerrors.IsNotFound(err) { - logger.Infof("CephBucketNotification %q not found", bnName) - return waitForRequeueIfNotificationNotReady, nil - } - return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) - } + } + } - // get the topic associated with the notification, and make sure it is provisioned - topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} - bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) - if err != nil { - logger.Infof("CephBucketTopic %q not provisioned yet", topicName) - return waitForRequeueIfTopicNotReady, nil - } + // remove notifications which are no longer specified in the OBC labels + for _, oldValue := range bnList { + if !sets.NewString(labelList...).Has(oldValue) { + deleteList = append(deleteList, oldValue) + } + } + retry := false + for _, notificationId := range deleteList { + err = deleteNotificationFunc(p, &ob, notificationId) + if err != nil { + logger.Errorf("notification %q failed remove from %q, returned error %v", notificationId, ob.Spec.Endpoint.BucketName, err) + retry = true + } + } + if retry { + return waitForRequeueIfNotificationNotDeleted, nil + } + // add new notifications to the list + return r.addNewNotifications(p, ob, labelList, objectStoreName, obc.Namespace) +} - if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { - return reconcile.Result{}, err +func (r *ReconcileOBCLabels) addNewNotifications(p provisioner, ob bktv1alpha1.ObjectBucket, notificationList []string, objectStoreName types.NamespacedName, namespace string) (reconcile.Result, error) { + for _, labelValue := range notificationList { + // for each notification label fetch the bucket notification CRD + notification := &cephv1.CephBucketNotification{} + bnName := types.NamespacedName{Namespace: namespace, Name: labelValue} + if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil { + if kerrors.IsNotFound(err) { + logger.Infof("CephBucketNotification %q not found in %q ", bnName.Name, bnName.Namespace) + return waitForRequeueIfNotificationNotReady, nil } + return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) + } - // provision the notification - err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification) - if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName) - } - logger.Infof("provisioned CephBucketNotification %q", bnName) + logger.Debugf("adding bucket notification %q in namespace %q to obc %q", bnName.Name, bnName.Namespace, ob.Spec.ClaimRef.Name) + // get the topic associated with the notification, and make sure it is provisioned + topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} + bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) + if err != nil { + logger.Infof("CephBucketTopic %q not provisioned yet in %q", topicName.Name, topicName.Namespace) + return waitForRequeueIfTopicNotReady, nil + } + + if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { + return reconcile.Result{}, err + } + + // provision the notification + err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification) + if err != nil { + return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName) } + logger.Infof("provisioned CephBucketNotification %q in namespace %q to obc %q", bnName.Name, bnName.Namespace, ob.Spec.ClaimRef.Name) } return reconcile.Result{}, nil diff --git a/pkg/operator/ceph/object/notification/obc_label_controller_test.go b/pkg/operator/ceph/object/notification/obc_label_controller_test.go index c7fbaf3620b2..2ca11f81f26d 100644 --- a/pkg/operator/ceph/object/notification/obc_label_controller_test.go +++ b/pkg/operator/ceph/object/notification/obc_label_controller_test.go @@ -33,6 +33,7 @@ import ( "github.com/rook/rook/pkg/operator/k8sutil" exectest "github.com/rook/rook/pkg/util/exec/test" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -42,6 +43,69 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) +func createOBResources(name string) (*bktv1alpha1.ObjectBucketClaim, *bktv1alpha1.ObjectBucket) { + return &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: testNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ObjectBucketClaim", + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + StorageClassName: testSCName, + GenerateBucketName: name, + }, + Status: bktv1alpha1.ObjectBucketClaimStatus{ + Phase: bktv1alpha1.ObjectBucketClaimStatusPhasePending, + }, + }, &bktv1alpha1.ObjectBucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: testNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ObjectBucket", + }, + Spec: bktv1alpha1.ObjectBucketSpec{ + StorageClassName: testSCName, + Connection: &bktv1alpha1.Connection{ + Endpoint: &bktv1alpha1.Endpoint{ + BucketHost: object.BuildDomainName(testStoreName, testNamespace), + }, + }, + ClaimRef: &corev1.ObjectReference{ + Name: name}, + }, + Status: bktv1alpha1.ObjectBucketStatus{ + Phase: bktv1alpha1.ObjectBucketStatusPhaseBound, + }, + } +} + +func createBucketNotification(name string) *cephv1.CephBucketNotification { + return &cephv1.CephBucketNotification{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: testNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "CephBucketNotification", + }, + Spec: cephv1.BucketNotificationSpec{ + Topic: testTopicName, + }, + } +} + +func setNotificationLabels(labelList []string) map[string]string { + var label = make(map[string]string) + for _, value := range labelList { + label[notificationLabelPrefix+value] = value + } + return label +} + func TestCephBucketNotificationOBCLabelController(t *testing.T) { mockSetup() defer mockCleanup() @@ -68,18 +132,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { }, Status: &cephv1.BucketTopicStatus{ARN: &testARN}, } - bucketNotification := &cephv1.CephBucketNotification{ - ObjectMeta: metav1.ObjectMeta{ - Name: testNotificationName, - Namespace: testNamespace, - }, - TypeMeta: metav1.TypeMeta{ - Kind: "CephBucketNotification", - }, - Spec: cephv1.BucketNotificationSpec{ - Topic: testTopicName, - }, - } + cephCluster := &cephv1.CephCluster{ ObjectMeta: metav1.ObjectMeta{ Name: testNamespace, @@ -92,45 +145,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { }, }, } - obc := &bktv1alpha1.ObjectBucketClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: testBucketName, - Namespace: testNamespace, - Labels: map[string]string{ - notificationLabelPrefix + testNotificationName: testNotificationName, - }, - }, - TypeMeta: metav1.TypeMeta{ - Kind: "ObjectBucketClaim", - }, - Spec: bktv1alpha1.ObjectBucketClaimSpec{ - StorageClassName: testSCName, - GenerateBucketName: testBucketName, - }, - Status: bktv1alpha1.ObjectBucketClaimStatus{ - Phase: bktv1alpha1.ObjectBucketClaimStatusPhasePending, - }, - } - ob := &bktv1alpha1.ObjectBucket{ - ObjectMeta: metav1.ObjectMeta{ - Name: testBucketName, - Namespace: testNamespace, - }, - TypeMeta: metav1.TypeMeta{ - Kind: "ObjectBucket", - }, - Spec: bktv1alpha1.ObjectBucketSpec{ - StorageClassName: testSCName, - Connection: &bktv1alpha1.Connection{ - Endpoint: &bktv1alpha1.Endpoint{ - BucketHost: object.BuildDomainName(testStoreName, testNamespace), - }, - }, - }, - Status: bktv1alpha1.ObjectBucketStatus{ - Phase: bktv1alpha1.ObjectBucketStatusPhaseBound, - }, - } + req := reconcile.Request{ NamespacedName: types.NamespacedName{ Name: testBucketName, @@ -173,10 +188,14 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { Type: k8sutil.RookType, } + obc, ob := createOBResources(testBucketName) + obc.Labels = setNotificationLabels([]string{testNotificationName}) + _, err := c.Clientset.CoreV1().Secrets(testNamespace).Create(ctx, secret, metav1.CreateOptions{}) assert.NoError(t, err) t.Run("provision OBC with notification label with no ob", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -189,10 +208,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) - assert.False(t, createWasInvoked) + assert.False(t, getWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) + assert.Equal(t, 0, len(deletedNotifications)) }) t.Run("provision OBC with notification label with not ready ob", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -206,13 +228,16 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) - assert.False(t, createWasInvoked) + assert.False(t, getWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) + assert.Equal(t, 0, len(deletedNotifications)) }) obc.Spec.ObjectBucketName = testBucketName obc.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound t.Run("provision OBC with notification label with no notification", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -226,10 +251,14 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) - assert.False(t, createWasInvoked) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) + assert.Equal(t, 0, len(deletedNotifications)) }) + bucketNotification := createBucketNotification(testNotificationName) t.Run("provision OBC with notification label and notification with no topic", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -244,10 +273,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) - assert.False(t, createWasInvoked) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) + assert.Equal(t, 0, len(deletedNotifications)) }) t.Run("provision OBC with notification label", func(t *testing.T) { + resetValues() objects := []runtime.Object{ cephCluster, obc, @@ -263,6 +295,156 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.False(t, res.Requeue) - assert.True(t, createWasInvoked) + assert.True(t, getWasInvoked) + assert.Equal(t, 1, len(createdNotifications)) + assert.ElementsMatch(t, createdNotifications, []string{testNotificationName}) + assert.Equal(t, 0, len(deletedNotifications)) + }) + + t.Run("reconcile with already existing label for the obc", func(t *testing.T) { + resetValues() + noChangeOBC, noChangeOB := createOBResources(noChangeBucketName) + noChangeOBC.Labels = setNotificationLabels([]string{testNotificationName}) + noChangeOBC.Spec.ObjectBucketName = noChangeBucketName + noChangeOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + req.NamespacedName.Name = noChangeBucketName + objects := []runtime.Object{ + cephCluster, + noChangeOBC, + noChangeOB, + bucketNotification, + bucketTopic, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 1, len(createdNotifications)) + assert.ElementsMatch(t, createdNotifications, []string{testNotificationName}) + assert.Equal(t, 0, len(deletedNotifications)) + }) + + t.Run("delete notification from the obc", func(t *testing.T) { + resetValues() + deleteOBC, deleteOB := createOBResources(deleteBucketName) + deleteOBC.Spec.GenerateBucketName = deleteBucketName + deleteOBC.Spec.ObjectBucketName = deleteBucketName + deleteOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + req.NamespacedName.Name = deleteBucketName + objects := []runtime.Object{ + cephCluster, + deleteOBC, + deleteOB, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) + assert.Equal(t, 1, len(deletedNotifications)) + assert.ElementsMatch(t, deletedNotifications, + []string{deleteBucketName + testNotificationName}) + }) + + t.Run("provision OBC with multiple notification labels", func(t *testing.T) { + resetValues() + multipleCreateOBC, multipleCreateOB := createOBResources(multipleCreateBucketName) + multipleCreateOBC.Labels = setNotificationLabels([]string{testNotificationName + "-1", testNotificationName + "-2"}) + multipleCreateOBC.Spec.ObjectBucketName = multipleCreateBucketName + multipleCreateOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + bucketNotification1 := createBucketNotification(testNotificationName + "-1") + bucketNotification2 := createBucketNotification(testNotificationName + "-2") + req.NamespacedName.Name = multipleCreateBucketName + + objects := []runtime.Object{ + cephCluster, + multipleCreateOBC, + multipleCreateOB, + bucketNotification1, + bucketNotification2, + bucketTopic, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 2, len(createdNotifications)) + assert.ElementsMatch(t, createdNotifications, []string{testNotificationName + "-1", testNotificationName + "-2"}) + assert.Equal(t, 0, len(deletedNotifications)) + }) + + t.Run("delete multiple notifications from the obc", func(t *testing.T) { + resetValues() + multipleDeleteOBC, multipleDeleteOB := createOBResources(multipleDeleteBucketName) + multipleDeleteOBC.Spec.GenerateBucketName = multipleDeleteBucketName + multipleDeleteOBC.Spec.ObjectBucketName = multipleDeleteBucketName + multipleDeleteOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + req.NamespacedName.Name = multipleDeleteBucketName + objects := []runtime.Object{ + cephCluster, + multipleDeleteOBC, + multipleDeleteOB, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 0, len(createdNotifications)) + assert.Equal(t, 2, len(deletedNotifications)) + assert.ElementsMatch(t, deletedNotifications, + []string{multipleDeleteBucketName + testNotificationName + "-1", multipleDeleteBucketName + testNotificationName + "-2"}) + }) + t.Run("provision OBC with multiple delete and create of notifications", func(t *testing.T) { + resetValues() + multipleBothOBC, multipleBothOB := createOBResources(multipleBothBucketName) + multipleBothOBC.Labels = setNotificationLabels([]string{testNotificationName + "-1", testNotificationName + "-2"}) + multipleBothOBC.Spec.ObjectBucketName = multipleBothBucketName + multipleBothOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound + bucketNotification1 := createBucketNotification(testNotificationName + "-1") + bucketNotification2 := createBucketNotification(testNotificationName + "-2") + req.NamespacedName.Name = multipleBothBucketName + + objects := []runtime.Object{ + cephCluster, + multipleBothOBC, + multipleBothOB, + bucketNotification1, + bucketNotification2, + bucketTopic, + } + + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + + r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + assert.True(t, getWasInvoked) + assert.Equal(t, 2, len(createdNotifications)) + assert.ElementsMatch(t, createdNotifications, []string{testNotificationName + "-1", testNotificationName + "-2"}) + assert.Equal(t, 2, len(deletedNotifications)) + assert.ElementsMatch(t, deletedNotifications, + []string{multipleDeleteBucketName + testNotificationName + "-1", multipleDeleteBucketName + testNotificationName + "-2"}) }) } diff --git a/pkg/operator/ceph/object/notification/provisioner.go b/pkg/operator/ceph/object/notification/provisioner.go index 5e9c6a3c9e37..5558085b4df4 100644 --- a/pkg/operator/ceph/object/notification/provisioner.go +++ b/pkg/operator/ceph/object/notification/provisioner.go @@ -30,6 +30,7 @@ import ( "github.com/rook/rook/pkg/clusterd" cephclient "github.com/rook/rook/pkg/daemon/ceph/client" "github.com/rook/rook/pkg/operator/ceph/object" + "github.com/rook/rook/pkg/operator/ceph/object/bucket" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -166,9 +167,38 @@ var createNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, t } // Allow overriding this function for unit tests -var deleteAllNotificationsFunc = deleteAllNotifications +var getAllNotificationsFunc = getAllRGWNotifications -var deleteAllNotifications = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error { +var getAllRGWNotifications = func(p provisioner, ob *bktv1alpha1.ObjectBucket) ([]string, error) { + bucketName := ob.Spec.Endpoint.BucketName + ownerName := ob.Spec.AdditionalState[bucket.CephUser] + s3Agent, err := newS3Agent(p) + if err != nil { + return nil, errors.Wrapf(err, "failed to create S3 agent for CephBucketNotification provisioning for bucket %q", bucketName) + } + nc, err := s3Agent.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{ + Bucket: &bucketName, + ExpectedBucketOwner: &ownerName, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to get BucketNotification from bucket %q", bucketName) + } + + notificationList := make([]string, 0) + if nc != nil { + for _, tc := range nc.TopicConfigurations { + notificationList = append(notificationList, *tc.Id) + } + } + logger.Debugf("Bucket Notifications %q was listed for bucket %q", notificationList, bucketName) + + return notificationList, nil +} + +// Allow overriding this function for unit tests +var deleteNotificationFunc = deleteNotification + +var deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error { bucketName := types.NamespacedName{Namespace: bucket.Namespace, Name: bucket.Name} s3Agent, err := newS3Agent(p) if err != nil { @@ -176,11 +206,11 @@ var deleteAllNotifications = func(p provisioner, bucket *bktv1alpha1.ObjectBucke } if err := DeleteBucketNotification(s3Agent.Client, &DeleteBucketNotificationRequestInput{ Bucket: &bucket.Spec.Endpoint.BucketName, - }); err != nil { - return errors.Wrapf(err, "failed to delete all bucket notifications from bucket %q", bucketName) + }, notificationId); err != nil { + return errors.Wrapf(err, "failed to delete bucket notification %q from bucket %q", notificationId, bucketName) } - logger.Infof("all bucket notifications deleted from bucket %q", bucketName) + logger.Debugf("bucket notification %q deleted from bucket %q", notificationId, bucketName) return nil } diff --git a/pkg/operator/ceph/object/notification/s3ext.go b/pkg/operator/ceph/object/notification/s3ext.go index 7de5ce05f9b6..a481b1a1589a 100644 --- a/pkg/operator/ceph/object/notification/s3ext.go +++ b/pkg/operator/ceph/object/notification/s3ext.go @@ -66,13 +66,16 @@ func (s *DeleteBucketNotificationRequestInput) Validate() error { const opDeleteBucketNotification = "DeleteBucketNotification" -func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRequestInput) *request.Request { +func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRequestInput, notificationId string) *request.Request { op := &request.Operation{ Name: opDeleteBucketNotification, HTTPMethod: http.MethodDelete, HTTPPath: "/{Bucket}?notification", } + if len(notificationId) > 0 { + op.HTTPPath = "/{Bucket}?notification=" + notificationId + } if input == nil { input = &DeleteBucketNotificationRequestInput{} } @@ -80,7 +83,7 @@ func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRe return c.NewRequest(op, input, nil) } -func DeleteBucketNotification(c *s3.S3, input *DeleteBucketNotificationRequestInput) error { - req := DeleteBucketNotificationRequest(c, input) +func DeleteBucketNotification(c *s3.S3, input *DeleteBucketNotificationRequestInput, notificationId string) error { + req := DeleteBucketNotificationRequest(c, input, notificationId) return req.Send() }