Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object: fix handling for notifications for OBC #9365

Merged
merged 1 commit into from Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/operator/ceph/object/notification/controller.go
Expand Up @@ -52,6 +52,7 @@ var logger = capnslog.NewPackageLogger("github.com/rook/rook", packageName)
var waitForRequeueIfTopicNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
var waitForRequeueIfNotificationNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
var waitForRequeueIfObjectBucketNotReady = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}
var waitForRequeueIfNotificationNotDeleted = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}

// ReconcileNotifications reconciles a CephbucketNotification
type ReconcileNotifications struct {
Expand Down
68 changes: 48 additions & 20 deletions pkg/operator/ceph/object/notification/controller_test.go
Expand Up @@ -43,30 +43,58 @@ import (
)

var (
testTopicName = "topic-a"
testNotificationName = "notification-a"
testNamespace = "rook-ceph"
testStoreName = "test-store"
testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName
testBucketName = "my-bucket"
testSCName = "my-storage-class"
testTopicName = "topic-a"
testNotificationName = "notification"
testNamespace = "rook-ceph"
testStoreName = "test-store"
testARN = "arn:aws:sns:" + testStoreName + "::" + testTopicName
testBucketName = "my-bucket"
testSCName = "my-storage-class"
deleteBucketName = "delete"
noChangeBucketName = "nochange"
multipleCreateBucketName = "multi-create"
multipleDeleteBucketName = "multi-delete"
multipleBothBucketName = "multi-both"
)

var createWasInvoked bool
var getWasInvoked bool
var createdNotifications []string
var deletedNotifications []string

func resetValues() {
getWasInvoked = false
createdNotifications = nil
deletedNotifications = nil

}
func mockCleanup() {
createWasInvoked = false
resetValues()
createNotificationFunc = createNotification
deleteAllNotificationsFunc = deleteAllNotifications
getAllNotificationsFunc = getAllRGWNotifications
deleteNotificationFunc = deleteNotification
}

func mockSetup() {
createWasInvoked = false
createNotificationFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, topicARN string, notification *cephv1.CephBucketNotification) error {
createWasInvoked = true
createdNotifications = append(createdNotifications, notification.Name)
return nil
}
deleteAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error {
getAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) ([]string, error) {
getWasInvoked = true
if bucket.Name == deleteBucketName {
return []string{deleteBucketName + testNotificationName}, nil
}
if bucket.Name == noChangeBucketName {
return []string{testNotificationName}, nil
}
if bucket.Name == multipleDeleteBucketName || bucket.Name == multipleBothBucketName {
return []string{multipleDeleteBucketName + testNotificationName + "-1",
multipleDeleteBucketName + testNotificationName + "-2"}, nil
}
return nil, nil
}
deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error {
deletedNotifications = append(deletedNotifications, notificationId)
return nil
}
}
Expand Down Expand Up @@ -181,7 +209,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("create notification and topic configuration when there is no cluster", func(t *testing.T) {
Expand All @@ -201,7 +229,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("create notification and topic configuration cluster is not ready", func(t *testing.T) {
Expand All @@ -222,7 +250,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("create notification and topic configuration when topic is not yet provisioned", func(t *testing.T) {
Expand All @@ -245,7 +273,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("create notification and topic configuration", func(t *testing.T) {
Expand All @@ -268,7 +296,7 @@ func TestCephBucketNotificationController(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})
}

Expand Down Expand Up @@ -404,7 +432,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.False(t, createWasInvoked)
assert.Equal(t, 0, len(createdNotifications))
})

t.Run("provision notification when OB exists", func(t *testing.T) {
Expand Down Expand Up @@ -448,7 +476,7 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) {
// notification configuration is set
err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification)
assert.NoError(t, err, bucketNotification)
assert.True(t, createWasInvoked)
assert.Equal(t, 1, len(createdNotifications))
})
}

Expand Down
84 changes: 57 additions & 27 deletions pkg/operator/ceph/object/notification/obc_label_controller.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rook/rook/pkg/operator/ceph/object/topic"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
return opcontroller.WaitForRequeueIfCephClusterNotReady, nil
}

// delete all existing notifications
// get all existing notifications
p := provisioner{
context: r.context,
clusterInfo: clusterInfo,
Expand All @@ -136,11 +137,13 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
owner: ob.Spec.AdditionalState[bucket.CephUser],
objectStoreName: objectStoreName,
}
err = deleteAllNotificationsFunc(p, &ob)
bnList, err := getAllNotificationsFunc(p, &ob)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed delete all bucket notifications from ObjectbucketClaim %q", bucketName)
}

labelList := make([]string, 0)
deleteList := make([]string, 0)
// looking for notifications in the labels
for labelKey, labelValue := range obc.Labels {
thotz marked this conversation as resolved.
Show resolved Hide resolved
notifyLabels := strings.SplitAfterN(labelKey, notificationLabelPrefix, 2)
Expand All @@ -149,37 +152,64 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
logger.Warningf("bucket notification label mismatch. ignoring key %q value %q", labelKey, labelValue)
continue
}
// for each notification label fetch the bucket notification CRD
labelList = append(labelList, labelValue)
thotz marked this conversation as resolved.
Show resolved Hide resolved
logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName)
notification := &cephv1.CephBucketNotification{}
bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not found", bnName)
return waitForRequeueIfNotificationNotReady, nil
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}
}
}

// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
}
// remove notifications which are no longer specified in the OBC labels
for _, oldValue := range bnList {
thotz marked this conversation as resolved.
Show resolved Hide resolved
if !sets.NewString(labelList...).Has(oldValue) {
deleteList = append(deleteList, oldValue)
}
}
retry := false
for _, notificationId := range deleteList {
err = deleteNotificationFunc(p, &ob, notificationId)
if err != nil {
logger.Errorf("notification %q failed remove from %q, returned error %v", notificationId, ob.Spec.Endpoint.BucketName, err)
retry = true
}
}
if retry {
return waitForRequeueIfNotificationNotDeleted, nil
}
// add new notifications to the list
return r.addNewNotifications(p, ob, labelList, objectStoreName, obc.Namespace)
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
func (r *ReconcileOBCLabels) addNewNotifications(p provisioner, ob bktv1alpha1.ObjectBucket, notificationList []string, objectStoreName types.NamespacedName, namespace string) (reconcile.Result, error) {
for _, labelValue := range notificationList {
// for each notification label fetch the bucket notification CRD
notification := &cephv1.CephBucketNotification{}
bnName := types.NamespacedName{Namespace: namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not found in %q ", bnName.Name, bnName.Namespace)
return waitForRequeueIfNotificationNotReady, nil
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
logger.Infof("provisioned CephBucketNotification %q", bnName)
logger.Debugf("adding bucket notification %q in namespace %q to obc %q", bnName.Name, bnName.Namespace, ob.Spec.ClaimRef.Name)
// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet in %q", topicName.Name, topicName.Namespace)
return waitForRequeueIfTopicNotReady, nil
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
logger.Infof("provisioned CephBucketNotification %q in namespace %q to obc %q", bnName.Name, bnName.Namespace, ob.Spec.ClaimRef.Name)
}

return reconcile.Result{}, nil
Expand Down