Skip to content

Commit

Permalink
Merge pull request #9521 from rook/mergify/bp/release-1.8/pr-9400
Browse files Browse the repository at this point in the history
rgw: add events to CephBucketNotification CR (backport #9400)
  • Loading branch information
mergify[bot] committed Jan 4, 2022
2 parents f5bcc01 + e13d8c1 commit f406ff2
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 327 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/ceph.rook.io/v1/types.go
Expand Up @@ -416,6 +416,8 @@ const (
ReconcileSucceeded ConditionReason = "ReconcileSucceeded"
// ReconcileFailed represents when a resource reconciliation failed.
ReconcileFailed ConditionReason = "ReconcileFailed"
// ReconcileStarted represents when a resource reconciliation started.
ReconcileStarted ConditionReason = "ReconcileStarted"

// DeletingReason represents when Rook has detected a resource object should be deleted.
DeletingReason ConditionReason = "Deleting"
Expand Down
60 changes: 33 additions & 27 deletions pkg/operator/ceph/object/notification/controller.go
Expand Up @@ -32,8 +32,12 @@ import (
"github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/pkg/operator/ceph/object/bucket"
"github.com/rook/rook/pkg/operator/ceph/object/topic"
"github.com/rook/rook/pkg/operator/ceph/reporting"
kapiv1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -59,6 +63,7 @@ type ReconcileNotifications struct {
client client.Client
context *clusterd.Context
opManagerContext context.Context
recorder record.EventRecorder
}

// Add creates a new CephBucketNotification controller and a new ObjectBucketClaim Controller and adds it to the Manager.
Expand All @@ -68,6 +73,7 @@ func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext contex
client: mgr.GetClient(),
context: context,
opManagerContext: opManagerContext,
recorder: mgr.GetEventRecorderFor(controllerName),
}); err != nil {
return err
}
Expand All @@ -76,6 +82,7 @@ func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext contex
client: mgr.GetClient(),
context: context,
opManagerContext: opManagerContext,
recorder: mgr.GetEventRecorderFor(controllerName),
})
}

Expand All @@ -100,86 +107,84 @@ func addNotificationReconciler(mgr manager.Manager, r reconcile.Reconciler) erro
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileNotifications) Reconcile(context context.Context, request reconcile.Request) (reconcile.Result, error) {
reconcileResponse, err := r.reconcile(request)
reconcileResponse, notification, err := r.reconcile(request)
if err != nil {
logger.Errorf("failed to reconcile %v", err)
}

return reconcileResponse, err
return reporting.ReportReconcileResult(logger, r.recorder, notification, reconcileResponse, err)
}

func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile.Result, error) {
func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile.Result, *cephv1.CephBucketNotification, error) {
// fetch the CephBucketNotification instance
notification := &cephv1.CephBucketNotification{}
notification := &cephv1.CephBucketNotification{ObjectMeta: metav1.ObjectMeta{Name: request.Name, Namespace: request.Namespace}}
bnName := request.NamespacedName
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileStarted), "Started reconciling CephBucketNotification %q", bnName)
err := r.client.Get(r.opManagerContext, request.NamespacedName, notification)
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debugf("CephBucketNotification %q resource not found. Ignoring since resource must be deleted.", request.NamespacedName)
return reconcile.Result{}, nil
logger.Debugf("CephBucketNotification %q resource not found. Ignoring since resource must be deleted.", bnName)
return reconcile.Result{}, notification, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", request.NamespacedName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}

// DELETE: the CR was deleted
if !notification.GetDeletionTimestamp().IsZero() {
logger.Debugf("CephBucketNotification %q was deleted", notification.Name)

logger.Debugf("CephBucketNotification %q was deleted", bnName)
// Return and do not requeue. Successful deletion.
return reconcile.Result{}, nil
return reconcile.Result{}, notification, nil
}

// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
return waitForRequeueIfTopicNotReady, notification, errors.Wrapf(err, "topic %q not provisioned yet", topicName)
}

// Populate clusterInfo during each reconcile
clusterInfo, clusterSpec, err := getReadyCluster(r.client, r.opManagerContext, *r.context, bucketTopic.Spec.ObjectStoreNamespace)
if err != nil {
return opcontroller.WaitForRequeueIfCephClusterNotReady, errors.Wrapf(err, "cluster is not ready")
return opcontroller.WaitForRequeueIfCephClusterNotReady, notification, errors.Wrapf(err, "cluster is not ready")
}
if clusterInfo == nil || clusterSpec == nil {
return opcontroller.WaitForRequeueIfCephClusterNotReady, nil
return opcontroller.WaitForRequeueIfCephClusterNotReady, notification, errors.New("cluster is not ready")
}

// fetch all OBCs that has a label matching this CephBucketNotification
namespace := notification.Namespace
bnName := types.NamespacedName{Namespace: namespace, Name: notification.Name}
namespaceListOpt := client.InNamespace(namespace)
namespaceListOpt := client.InNamespace(notification.Namespace)
labelListOpt := client.MatchingLabels{
notificationLabelPrefix + notification.Name: notification.Name,
}
obcList := &bktv1alpha1.ObjectBucketClaimList{}
err = r.client.List(r.opManagerContext, obcList, namespaceListOpt, labelListOpt)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to list ObjectBucketClaims for CephBucketNotification %q", bnName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to list ObjectBucketClaims for CephBucketNotification %q", bnName)
}
if len(obcList.Items) == 0 {
logger.Debugf("no ObjectbucketClaim associated with CephBucketNotification %q", bnName)
return reconcile.Result{}, nil
return reconcile.Result{}, notification, nil
}

// loop through all OBCs in the list and get their OBs
for _, obc := range obcList.Items {
if obc.Spec.ObjectBucketName == "" {
logger.Infof("ObjectBucketClaim %q resource did not create the bucket yet. will retry", types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace})
return waitForRequeueIfObjectBucketNotReady, nil
return waitForRequeueIfObjectBucketNotReady, notification, errors.Errorf("ObjectBucketClaim %q did not create the bucket yet",
types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace})
}
ob := bktv1alpha1.ObjectBucket{}
bucketName := types.NamespacedName{Namespace: namespace, Name: obc.Spec.ObjectBucketName}
bucketName := types.NamespacedName{Namespace: notification.Namespace, Name: obc.Spec.ObjectBucketName}
if err := r.client.Get(r.opManagerContext, bucketName, &ob); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve ObjectBucket %v", bucketName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to retrieve ObjectBucket %v", bucketName)
}
objectStoreName, err := getCephObjectStoreName(ob)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to get object store from ObjectBucket %q", bucketName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to get object store from ObjectBucket %q", bucketName)
}
if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
return reconcile.Result{}, notification, err
}

err = createNotificationFunc(
Expand All @@ -196,11 +201,12 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
notification,
)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to provision notification for ObjectBucketClaims %q", bucketName)
}
logger.Infof("provisioned CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName)
}

return reconcile.Result{}, nil
return reconcile.Result{}, notification, nil
}

func getCephObjectStoreName(ob bktv1alpha1.ObjectBucket) (types.NamespacedName, error) {
Expand Down

0 comments on commit f406ff2

Please sign in to comment.