Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgw: add events to CephBucketNotification CR #9400

Merged
merged 1 commit into from Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/apis/ceph.rook.io/v1/types.go
Expand Up @@ -413,6 +413,8 @@ const (
ReconcileSucceeded ConditionReason = "ReconcileSucceeded"
// ReconcileFailed represents when a resource reconciliation failed.
ReconcileFailed ConditionReason = "ReconcileFailed"
// ReconcileStarted represents when a resource reconciliation started.
ReconcileStarted ConditionReason = "ReconcileStarted"

// DeletingReason represents when Rook has detected a resource object should be deleted.
DeletingReason ConditionReason = "Deleting"
Expand Down
60 changes: 33 additions & 27 deletions pkg/operator/ceph/object/notification/controller.go
Expand Up @@ -32,8 +32,12 @@ import (
"github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/pkg/operator/ceph/object/bucket"
"github.com/rook/rook/pkg/operator/ceph/object/topic"
"github.com/rook/rook/pkg/operator/ceph/reporting"
kapiv1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -59,6 +63,7 @@ type ReconcileNotifications struct {
client client.Client
context *clusterd.Context
opManagerContext context.Context
recorder record.EventRecorder
}

// Add creates a new CephBucketNotification controller and a new ObjectBucketClaim Controller and adds it to the Manager.
Expand All @@ -68,6 +73,7 @@ func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext contex
client: mgr.GetClient(),
context: context,
opManagerContext: opManagerContext,
recorder: mgr.GetEventRecorderFor(controllerName),
}); err != nil {
return err
}
Expand All @@ -76,6 +82,7 @@ func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext contex
client: mgr.GetClient(),
context: context,
opManagerContext: opManagerContext,
recorder: mgr.GetEventRecorderFor(controllerName),
})
}

Expand All @@ -100,86 +107,84 @@ func addNotificationReconciler(mgr manager.Manager, r reconcile.Reconciler) erro
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileNotifications) Reconcile(context context.Context, request reconcile.Request) (reconcile.Result, error) {
reconcileResponse, err := r.reconcile(request)
reconcileResponse, notification, err := r.reconcile(request)
if err != nil {
logger.Errorf("failed to reconcile %v", err)
}

return reconcileResponse, err
return reporting.ReportReconcileResult(logger, r.recorder, notification, reconcileResponse, err)
}

func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile.Result, error) {
func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile.Result, *cephv1.CephBucketNotification, error) {
// fetch the CephBucketNotification instance
notification := &cephv1.CephBucketNotification{}
notification := &cephv1.CephBucketNotification{ObjectMeta: metav1.ObjectMeta{Name: request.Name, Namespace: request.Namespace}}
bnName := request.NamespacedName
r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileStarted), "Started reconciling CephBucketNotification %q", bnName)
err := r.client.Get(r.opManagerContext, request.NamespacedName, notification)
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debugf("CephBucketNotification %q resource not found. Ignoring since resource must be deleted.", request.NamespacedName)
return reconcile.Result{}, nil
logger.Debugf("CephBucketNotification %q resource not found. Ignoring since resource must be deleted.", bnName)
return reconcile.Result{}, notification, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", request.NamespacedName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
}

// DELETE: the CR was deleted
if !notification.GetDeletionTimestamp().IsZero() {
logger.Debugf("CephBucketNotification %q was deleted", notification.Name)

logger.Debugf("CephBucketNotification %q was deleted", bnName)
// Return and do not requeue. Successful deletion.
return reconcile.Result{}, nil
return reconcile.Result{}, notification, nil
}

// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
return waitForRequeueIfTopicNotReady, notification, errors.Wrapf(err, "topic %q not provisioned yet", topicName)
}

// Populate clusterInfo during each reconcile
clusterInfo, clusterSpec, err := getReadyCluster(r.client, r.opManagerContext, *r.context, bucketTopic.Spec.ObjectStoreNamespace)
if err != nil {
return opcontroller.WaitForRequeueIfCephClusterNotReady, errors.Wrapf(err, "cluster is not ready")
return opcontroller.WaitForRequeueIfCephClusterNotReady, notification, errors.Wrapf(err, "cluster is not ready")
}
if clusterInfo == nil || clusterSpec == nil {
return opcontroller.WaitForRequeueIfCephClusterNotReady, nil
return opcontroller.WaitForRequeueIfCephClusterNotReady, notification, errors.New("cluster is not ready")
}

// fetch all OBCs that has a label matching this CephBucketNotification
namespace := notification.Namespace
bnName := types.NamespacedName{Namespace: namespace, Name: notification.Name}
namespaceListOpt := client.InNamespace(namespace)
namespaceListOpt := client.InNamespace(notification.Namespace)
labelListOpt := client.MatchingLabels{
notificationLabelPrefix + notification.Name: notification.Name,
}
obcList := &bktv1alpha1.ObjectBucketClaimList{}
err = r.client.List(r.opManagerContext, obcList, namespaceListOpt, labelListOpt)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to list ObjectBucketClaims for CephBucketNotification %q", bnName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to list ObjectBucketClaims for CephBucketNotification %q", bnName)
}
if len(obcList.Items) == 0 {
logger.Debugf("no ObjectbucketClaim associated with CephBucketNotification %q", bnName)
return reconcile.Result{}, nil
return reconcile.Result{}, notification, nil
}

// loop through all OBCs in the list and get their OBs
for _, obc := range obcList.Items {
if obc.Spec.ObjectBucketName == "" {
logger.Infof("ObjectBucketClaim %q resource did not create the bucket yet. will retry", types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace})
return waitForRequeueIfObjectBucketNotReady, nil
return waitForRequeueIfObjectBucketNotReady, notification, errors.Errorf("ObjectBucketClaim %q did not create the bucket yet",
types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace})
}
ob := bktv1alpha1.ObjectBucket{}
bucketName := types.NamespacedName{Namespace: namespace, Name: obc.Spec.ObjectBucketName}
bucketName := types.NamespacedName{Namespace: notification.Namespace, Name: obc.Spec.ObjectBucketName}
if err := r.client.Get(r.opManagerContext, bucketName, &ob); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve ObjectBucket %v", bucketName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to retrieve ObjectBucket %v", bucketName)
}
objectStoreName, err := getCephObjectStoreName(ob)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to get object store from ObjectBucket %q", bucketName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to get object store from ObjectBucket %q", bucketName)
}
if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
return reconcile.Result{}, err
return reconcile.Result{}, notification, err
}

err = createNotificationFunc(
Expand All @@ -196,11 +201,12 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
notification,
)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName)
return reconcile.Result{}, notification, errors.Wrapf(err, "failed to provision notification for ObjectBucketClaims %q", bucketName)
}
logger.Infof("provisioned CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName)
}

return reconcile.Result{}, nil
return reconcile.Result{}, notification, nil
}

func getCephObjectStoreName(ob bktv1alpha1.ObjectBucket) (types.NamespacedName, error) {
Expand Down