Skip to content

Commit

Permalink
object: fix handling for notifications for OBC
Browse files Browse the repository at this point in the history
Current approach is to delete all the existing notifcations from bucket
and re-add the notification from the labels.

Signed-off-by: Jiffin Tony Thottan <thottanjiffin@gmail.com>
  • Loading branch information
thotz committed Dec 9, 2021
1 parent d40e6d7 commit f1dee39
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 38 deletions.
20 changes: 17 additions & 3 deletions pkg/operator/ceph/object/notification/controller_test.go
Expand Up @@ -52,12 +52,15 @@ var (
testSCName = "my-storage-class"
)

var createWasInvoked bool
var createWasInvoked, getWasInvoked, deleteWasInvoked bool

func mockCleanup() {
createWasInvoked = false
getWasInvoked = false
deleteWasInvoked = false
createNotificationFunc = createNotification
deleteAllNotificationsFunc = deleteAllNotifications
getAllNotificationsFunc = getAllNotifications
deleteNotificationFunc = deleteNotification
}

func mockSetup() {
Expand All @@ -66,7 +69,18 @@ func mockSetup() {
createWasInvoked = true
return nil
}
deleteAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error {
getAllNotificationsFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) ([]string, error) {
getWasInvoked = true
if bucket.Name == "delete" {
return []string{"delete" + testNotificationName}, nil
}
if bucket.Name == "nochange" {
return []string{testNotificationName}, nil
}
return nil, nil
}
deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error {
deleteWasInvoked = true
return nil
}
}
Expand Down
91 changes: 64 additions & 27 deletions pkg/operator/ceph/object/notification/obc_label_controller.go
Expand Up @@ -127,7 +127,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
return opcontroller.WaitForRequeueIfCephClusterNotReady, nil
}

// delete all existing notifications
// get all existing notifications
p := provisioner{
context: r.context,
clusterInfo: clusterInfo,
Expand All @@ -136,11 +136,14 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
owner: ob.Spec.AdditionalState[bucket.CephUser],
objectStoreName: objectStoreName,
}
err = deleteAllNotificationsFunc(p, &ob)
bnList, err := getAllNotificationsFunc(p, &ob)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed delete all bucket notifications from ObjectbucketClaim %q", bucketName)
}

labelList := make([]string, 0)
addList := make([]string, 0)
deleteList := make([]string, 0)
// looking for notifications in the labels
for labelKey, labelValue := range obc.Labels {
notifyLabels := strings.SplitAfterN(labelKey, notificationLabelPrefix, 2)
Expand All @@ -149,36 +152,70 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
logger.Warningf("bucket notification label mismatch. ignoring key %q value %q", labelKey, labelValue)
continue
}
// for each notification label fetch the bucket notification CRD
logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName)
notification := &cephv1.CephBucketNotification{}
bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not found", bnName)
return waitForRequeueIfNotificationNotReady, nil
labelList = append(labelList, labelValue)
found := false
for _, existingValue := range bnList {
if labelValue == existingValue {
found = true
break
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}

// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
// add it into new list for creating notifications for the bucket
if !found {
addList = append(addList, labelValue)
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
}
}
// remove notifications which are no longer specified in the OBC labels
for _, oldValue := range bnList {
found := false
for _, newValue := range labelList {
if oldValue == newValue {
found = true
break
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
if !found {
deleteList = append(deleteList, oldValue)
}
}
for _, labelValue := range addList {
// for each notification label fetch the bucket notification CRD
logger.Debugf("bucket notification label %q found on ObjectbucketClaim %q", labelValue, bucketName)
notification := &cephv1.CephBucketNotification{}
bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not found", bnName)
return waitForRequeueIfNotificationNotReady, nil
}
logger.Infof("provisioned CephBucketNotification %q", bnName)
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}

// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
}

// provision the notification
err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
logger.Infof("provisioned CephBucketNotification %q", bnName)
}

for _, notificationId := range deleteList {
err = deleteNotificationFunc(p, &ob, notificationId)
if err != nil {
logger.Warningf("notification %q failed remove from %q, returned error %v", notificationId, ob.Spec.Endpoint.BucketName, err)
}
}

Expand Down
60 changes: 60 additions & 0 deletions pkg/operator/ceph/object/notification/obc_label_controller_test.go
Expand Up @@ -189,7 +189,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) {
res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
assert.False(t, getWasInvoked)
assert.False(t, createWasInvoked)
assert.False(t, deleteWasInvoked)
})

t.Run("provision OBC with notification label with not ready ob", func(t *testing.T) {
Expand All @@ -206,7 +208,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) {
res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
assert.False(t, getWasInvoked)
assert.False(t, createWasInvoked)
assert.False(t, deleteWasInvoked)
})

obc.Spec.ObjectBucketName = testBucketName
Expand All @@ -226,7 +230,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) {
res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
assert.True(t, getWasInvoked)
assert.False(t, createWasInvoked)
assert.False(t, deleteWasInvoked)
})

t.Run("provision OBC with notification label and notification with no topic", func(t *testing.T) {
Expand All @@ -244,7 +250,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) {
res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.True(t, res.Requeue)
assert.True(t, getWasInvoked)
assert.False(t, createWasInvoked)
assert.False(t, deleteWasInvoked)
})

t.Run("provision OBC with notification label", func(t *testing.T) {
Expand All @@ -263,6 +271,58 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) {
res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.False(t, res.Requeue)
assert.True(t, getWasInvoked)
assert.True(t, createWasInvoked)
assert.False(t, deleteWasInvoked)
})

t.Run("reconcile with already exisitng label for the obc", func(t *testing.T) {
nochangeOB := ob.DeepCopy()
nochangeOB.Name = "nochange"
nochangeOBC := obc.DeepCopy()
nochangeOBC.Name = "nochange"
objects := []runtime.Object{
cephCluster,
nochangeOBC,
nochangeOB,
bucketNotification,
bucketTopic,
}

cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build()

r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx}

res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.False(t, res.Requeue)
assert.True(t, getWasInvoked)
assert.False(t, createWasInvoked)
assert.False(t, deleteWasInvoked)
})

t.Run("delete notification for the obc", func(t *testing.T) {
deleteOB := ob.DeepCopy()
deleteOB.Name = "delete"
deleteOBC := obc.DeepCopy()
deleteOBC.Name = "delete"
objects := []runtime.Object{
cephCluster,
deleteOBC,
deleteOB,
bucketNotification,
bucketTopic,
}

cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build()

r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx}

res, err := r.Reconcile(ctx, req)
assert.NoError(t, err)
assert.False(t, res.Requeue)
assert.True(t, getWasInvoked)
assert.False(t, createWasInvoked)
assert.True(t, deleteWasInvoked)
})
}
40 changes: 35 additions & 5 deletions pkg/operator/ceph/object/notification/provisioner.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rook/rook/pkg/clusterd"
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
"github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/pkg/operator/ceph/object/bucket"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -166,21 +167,50 @@ var createNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, t
}

// Allow overriding this function for unit tests
var deleteAllNotificationsFunc = deleteAllNotifications
var getAllNotificationsFunc = getAllNotifications

var deleteAllNotifications = func(p provisioner, bucket *bktv1alpha1.ObjectBucket) error {
var getAllNotifications = func(p provisioner, ob *bktv1alpha1.ObjectBucket) ([]string, error) {
bucketName := ob.Spec.Endpoint.BucketName
ownerName := ob.Spec.AdditionalState[bucket.CephUser]
s3Agent, err := newS3Agent(p)
if err != nil {
return nil, errors.Wrapf(err, "failed to create S3 agent for CephBucketNotification provisioning for bucket %q", bucketName)
}
nc, err := s3Agent.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{
Bucket: &bucketName,
ExpectedBucketOwner: &ownerName,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to get BucketNotification from bucket %q", bucketName)
}

notificationList := make([]string, 0)
if nc != nil {
for _, tc := range nc.TopicConfigurations {
notificationList = append(notificationList, *tc.Id)
}
}
logger.Debugf("Bucket Notifications %q was listed for bucket %q", notificationList, bucketName)

return notificationList, nil
}

// Allow overriding this function for unit tests
var deleteNotificationFunc = deleteNotification

var deleteNotification = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, notificationId string) error {
bucketName := types.NamespacedName{Namespace: bucket.Namespace, Name: bucket.Name}
s3Agent, err := newS3Agent(p)
if err != nil {
return errors.Wrapf(err, "failed to create S3 agent for deleting all bucket notifications from bucket %q", bucketName)
}
if err := DeleteBucketNotification(s3Agent.Client, &DeleteBucketNotificationRequestInput{
Bucket: &bucket.Spec.Endpoint.BucketName,
}); err != nil {
return errors.Wrapf(err, "failed to delete all bucket notifications from bucket %q", bucketName)
}, notificationId); err != nil {
return errors.Wrapf(err, "failed to delete bucket notification %q from bucket %q", notificationId, bucketName)
}

logger.Infof("all bucket notifications deleted from bucket %q", bucketName)
logger.Debugf("bucket notification %q deleted from bucket %q", notificationId, bucketName)

return nil
}
9 changes: 6 additions & 3 deletions pkg/operator/ceph/object/notification/s3ext.go
Expand Up @@ -66,21 +66,24 @@ func (s *DeleteBucketNotificationRequestInput) Validate() error {

const opDeleteBucketNotification = "DeleteBucketNotification"

func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRequestInput) *request.Request {
func DeleteBucketNotificationRequest(c *s3.S3, input *DeleteBucketNotificationRequestInput, notificationId string) *request.Request {
op := &request.Operation{
Name: opDeleteBucketNotification,
HTTPMethod: http.MethodDelete,
HTTPPath: "/{Bucket}?notification",
}

if len(notificationId) > 0 {
op.HTTPPath = "/{Bucket}?notification=" + notificationId
}
if input == nil {
input = &DeleteBucketNotificationRequestInput{}
}

return c.NewRequest(op, input, nil)
}

func DeleteBucketNotification(c *s3.S3, input *DeleteBucketNotificationRequestInput) error {
req := DeleteBucketNotificationRequest(c, input)
func DeleteBucketNotification(c *s3.S3, input *DeleteBucketNotificationRequestInput, notificationId string) error {
req := DeleteBucketNotificationRequest(c, input, notificationId)
return req.Send()
}

0 comments on commit f1dee39

Please sign in to comment.