diff --git a/deploy/charts/rook-ceph/templates/resources.yaml b/deploy/charts/rook-ceph/templates/resources.yaml index c3deafc7136e3..20a4d0dcb7165 100644 --- a/deploy/charts/rook-ceph/templates/resources.yaml +++ b/deploy/charts/rook-ceph/templates/resources.yaml @@ -4771,6 +4771,73 @@ status: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.5.1-0.20210420220833-f284e2e8098c + helm.sh/resource-policy: keep + creationTimestamp: null + name: cephfilesystemsubvolumegroups.ceph.rook.io +spec: + group: ceph.rook.io + names: + kind: CephFilesystemSubVolumeGroup + listKind: CephFilesystemSubVolumeGroupList + plural: cephfilesystemsubvolumegroups + singular: cephfilesystemsubvolumegroup + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + description: CephFilesystemSubVolumeGroup represents a Ceph Filesystem SubVolumeGroup + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: Spec represents the specification of a Ceph Filesystem SubVolumeGroup + properties: + filesystemName: + description: FilesystemName is the name of Ceph Filesystem SubVolumeGroup volume name. Typically it's the name of the CephFilesystem CR. If not coming from the CephFilesystem CR, it can be retrieved from the list of Ceph Filesystem volumes with `ceph fs volume ls`. To learn more about Ceph Filesystem abstractions see https://docs.ceph.com/en/latest/cephfs/fs-volumes/#fs-volumes-and-subvolumes + type: string + required: + - filesystemName + type: object + status: + description: Status represents the status of a CephFilesystem SubvolumeGroup + properties: + info: + additionalProperties: + type: string + nullable: true + type: object + phase: + description: ConditionType represent a resource's status + type: string + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - metadata + - spec + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.5.1-0.20210420220833-f284e2e8098c @@ -6001,73 +6068,6 @@ status: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.5.1-0.20210420220833-f284e2e8098c - helm.sh/resource-policy: keep - creationTimestamp: null - name: cephfilesystemsubvolumegroups.ceph.rook.io -spec: - group: ceph.rook.io - names: - kind: CephFilesystemSubVolumeGroup - listKind: CephFilesystemSubVolumeGroupList - plural: cephfilesystemsubvolumegroups - singular: cephfilesystemsubvolumegroup - scope: Namespaced - versions: - - name: v1 - schema: - openAPIV3Schema: - description: CephFilesystemSubVolumeGroup represents a Ceph Filesystem SubVolumeGroup - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: Spec represents the specification of a Ceph Filesystem SubVolumeGroup - properties: - filesystemName: - description: FilesystemName is the name of Ceph Filesystem SubVolumeGroup volume name. Typically it's the name of the CephFilesystem CR. If not coming from the CephFilesystem CR, it can be retrieved from the list of Ceph Filesystem volumes with `ceph fs volume ls`. To learn more about Ceph Filesystem abstractions see https://docs.ceph.com/en/latest/cephfs/fs-volumes/#fs-volumes-and-subvolumes - type: string - required: - - filesystemName - type: object - status: - description: Status represents the status of a CephFilesystem SubvolumeGroup - properties: - info: - additionalProperties: - type: string - nullable: true - type: object - phase: - description: ConditionType represent a resource's status - type: string - type: object - x-kubernetes-preserve-unknown-fields: true - required: - - metadata - - spec - type: object - served: true - storage: true - subresources: - status: {} -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] ---- -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.5.1-0.20210420220833-f284e2e8098c diff --git a/deploy/examples/crds.yaml b/deploy/examples/crds.yaml index 485697db8e0ee..e82c512382c17 100644 --- a/deploy/examples/crds.yaml +++ b/deploy/examples/crds.yaml @@ -4769,6 +4769,72 @@ status: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.5.1-0.20210420220833-f284e2e8098c + creationTimestamp: null + name: cephfilesystemsubvolumegroups.ceph.rook.io +spec: + group: ceph.rook.io + names: + kind: CephFilesystemSubVolumeGroup + listKind: CephFilesystemSubVolumeGroupList + plural: cephfilesystemsubvolumegroups + singular: cephfilesystemsubvolumegroup + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + description: CephFilesystemSubVolumeGroup represents a Ceph Filesystem SubVolumeGroup + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: Spec represents the specification of a Ceph Filesystem SubVolumeGroup + properties: + filesystemName: + description: FilesystemName is the name of Ceph Filesystem SubVolumeGroup volume name. Typically it's the name of the CephFilesystem CR. If not coming from the CephFilesystem CR, it can be retrieved from the list of Ceph Filesystem volumes with `ceph fs volume ls`. To learn more about Ceph Filesystem abstractions see https://docs.ceph.com/en/latest/cephfs/fs-volumes/#fs-volumes-and-subvolumes + type: string + required: + - filesystemName + type: object + status: + description: Status represents the status of a CephFilesystem SubvolumeGroup + properties: + info: + additionalProperties: + type: string + nullable: true + type: object + phase: + description: ConditionType represent a resource's status + type: string + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - metadata + - spec + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.5.1-0.20210420220833-f284e2e8098c @@ -5998,72 +6064,6 @@ status: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition -metadata: - annotations: - controller-gen.kubebuilder.io/version: v0.5.1-0.20210420220833-f284e2e8098c - creationTimestamp: null - name: cephfilesystemsubvolumegroups.ceph.rook.io -spec: - group: ceph.rook.io - names: - kind: CephFilesystemSubVolumeGroup - listKind: CephFilesystemSubVolumeGroupList - plural: cephfilesystemsubvolumegroups - singular: cephfilesystemsubvolumegroup - scope: Namespaced - versions: - - name: v1 - schema: - openAPIV3Schema: - description: CephFilesystemSubVolumeGroup represents a Ceph Filesystem SubVolumeGroup - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: Spec represents the specification of a Ceph Filesystem SubVolumeGroup - properties: - filesystemName: - description: FilesystemName is the name of Ceph Filesystem SubVolumeGroup volume name. Typically it's the name of the CephFilesystem CR. If not coming from the CephFilesystem CR, it can be retrieved from the list of Ceph Filesystem volumes with `ceph fs volume ls`. To learn more about Ceph Filesystem abstractions see https://docs.ceph.com/en/latest/cephfs/fs-volumes/#fs-volumes-and-subvolumes - type: string - required: - - filesystemName - type: object - status: - description: Status represents the status of a CephFilesystem SubvolumeGroup - properties: - info: - additionalProperties: - type: string - nullable: true - type: object - phase: - description: ConditionType represent a resource's status - type: string - type: object - x-kubernetes-preserve-unknown-fields: true - required: - - metadata - - spec - type: object - served: true - storage: true - subresources: - status: {} -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] ---- -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.5.1-0.20210420220833-f284e2e8098c diff --git a/pkg/apis/ceph.rook.io/v1/types.go b/pkg/apis/ceph.rook.io/v1/types.go index ffa4016381c10..1092869830808 100755 --- a/pkg/apis/ceph.rook.io/v1/types.go +++ b/pkg/apis/ceph.rook.io/v1/types.go @@ -413,6 +413,8 @@ const ( ReconcileSucceeded ConditionReason = "ReconcileSucceeded" // ReconcileFailed represents when a resource reconciliation failed. ReconcileFailed ConditionReason = "ReconcileFailed" + // ReconcileStarted represents when a resource reconciliation started. + ReconcileStarted ConditionReason = "ReconcileStarted" // DeletingReason represents when Rook has detected a resource object should be deleted. DeletingReason ConditionReason = "Deleting" diff --git a/pkg/operator/ceph/object/notification/controller.go b/pkg/operator/ceph/object/notification/controller.go index 60594a25e9936..8b60b87bd2c3e 100644 --- a/pkg/operator/ceph/object/notification/controller.go +++ b/pkg/operator/ceph/object/notification/controller.go @@ -32,8 +32,12 @@ import ( "github.com/rook/rook/pkg/operator/ceph/object" "github.com/rook/rook/pkg/operator/ceph/object/bucket" "github.com/rook/rook/pkg/operator/ceph/object/topic" + "github.com/rook/rook/pkg/operator/ceph/reporting" + kapiv1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -59,6 +63,7 @@ type ReconcileNotifications struct { client client.Client context *clusterd.Context opManagerContext context.Context + recorder record.EventRecorder } // Add creates a new CephBucketNotification controller and a new ObjectBucketClaim Controller and adds it to the Manager. @@ -68,6 +73,7 @@ func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext contex client: mgr.GetClient(), context: context, opManagerContext: opManagerContext, + recorder: mgr.GetEventRecorderFor(controllerName), }); err != nil { return err } @@ -76,6 +82,7 @@ func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext contex client: mgr.GetClient(), context: context, opManagerContext: opManagerContext, + recorder: mgr.GetEventRecorderFor(controllerName), }) } @@ -100,86 +107,84 @@ func addNotificationReconciler(mgr manager.Manager, r reconcile.Reconciler) erro // The Controller will requeue the Request to be processed again if the returned error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. func (r *ReconcileNotifications) Reconcile(context context.Context, request reconcile.Request) (reconcile.Result, error) { - reconcileResponse, err := r.reconcile(request) + reconcileResponse, notification, err := r.reconcile(request) if err != nil { logger.Errorf("failed to reconcile %v", err) } - return reconcileResponse, err + return reporting.ReportReconcileResult(logger, r.recorder, notification, reconcileResponse, err) } -func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile.Result, error) { +func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile.Result, *cephv1.CephBucketNotification, error) { // fetch the CephBucketNotification instance - notification := &cephv1.CephBucketNotification{} + notification := &cephv1.CephBucketNotification{ObjectMeta: metav1.ObjectMeta{Name: request.Name, Namespace: request.Namespace}} + bnName := request.NamespacedName + r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileStarted), "Started reconciling CephBucketNotification %q", bnName) err := r.client.Get(r.opManagerContext, request.NamespacedName, notification) if err != nil { if kerrors.IsNotFound(err) { - logger.Debugf("CephBucketNotification %q resource not found. Ignoring since resource must be deleted.", request.NamespacedName) - return reconcile.Result{}, nil + logger.Debugf("CephBucketNotification %q resource not found. Ignoring since resource must be deleted.", bnName) + return reconcile.Result{}, notification, nil } // Error reading the object - requeue the request. - return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", request.NamespacedName) + return reconcile.Result{}, notification, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) } // DELETE: the CR was deleted if !notification.GetDeletionTimestamp().IsZero() { - logger.Debugf("CephBucketNotification %q was deleted", notification.Name) - + logger.Debugf("CephBucketNotification %q was deleted", bnName) // Return and do not requeue. Successful deletion. - return reconcile.Result{}, nil + return reconcile.Result{}, notification, nil } // get the topic associated with the notification, and make sure it is provisioned topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) if err != nil { - logger.Infof("CephBucketTopic %q not provisioned yet", topicName) - return waitForRequeueIfTopicNotReady, nil + return waitForRequeueIfTopicNotReady, notification, errors.Wrapf(err, "topic %q not provisioned yet", topicName) } // Populate clusterInfo during each reconcile clusterInfo, clusterSpec, err := getReadyCluster(r.client, r.opManagerContext, *r.context, bucketTopic.Spec.ObjectStoreNamespace) if err != nil { - return opcontroller.WaitForRequeueIfCephClusterNotReady, errors.Wrapf(err, "cluster is not ready") + return opcontroller.WaitForRequeueIfCephClusterNotReady, notification, errors.Wrapf(err, "cluster is not ready") } if clusterInfo == nil || clusterSpec == nil { - return opcontroller.WaitForRequeueIfCephClusterNotReady, nil + return opcontroller.WaitForRequeueIfCephClusterNotReady, notification, errors.New("cluster is not ready") } // fetch all OBCs that has a label matching this CephBucketNotification - namespace := notification.Namespace - bnName := types.NamespacedName{Namespace: namespace, Name: notification.Name} - namespaceListOpt := client.InNamespace(namespace) + namespaceListOpt := client.InNamespace(notification.Namespace) labelListOpt := client.MatchingLabels{ notificationLabelPrefix + notification.Name: notification.Name, } obcList := &bktv1alpha1.ObjectBucketClaimList{} err = r.client.List(r.opManagerContext, obcList, namespaceListOpt, labelListOpt) if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to list ObjectBucketClaims for CephBucketNotification %q", bnName) + return reconcile.Result{}, notification, errors.Wrapf(err, "failed to list ObjectBucketClaims for CephBucketNotification %q", bnName) } if len(obcList.Items) == 0 { logger.Debugf("no ObjectbucketClaim associated with CephBucketNotification %q", bnName) - return reconcile.Result{}, nil + return reconcile.Result{}, notification, nil } // loop through all OBCs in the list and get their OBs for _, obc := range obcList.Items { if obc.Spec.ObjectBucketName == "" { - logger.Infof("ObjectBucketClaim %q resource did not create the bucket yet. will retry", types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace}) - return waitForRequeueIfObjectBucketNotReady, nil + return waitForRequeueIfObjectBucketNotReady, notification, errors.Errorf("ObjectBucketClaim %q did not create the bucket yet", + types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace}) } ob := bktv1alpha1.ObjectBucket{} - bucketName := types.NamespacedName{Namespace: namespace, Name: obc.Spec.ObjectBucketName} + bucketName := types.NamespacedName{Namespace: notification.Namespace, Name: obc.Spec.ObjectBucketName} if err := r.client.Get(r.opManagerContext, bucketName, &ob); err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve ObjectBucket %v", bucketName) + return reconcile.Result{}, notification, errors.Wrapf(err, "failed to retrieve ObjectBucket %v", bucketName) } objectStoreName, err := getCephObjectStoreName(ob) if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to get object store from ObjectBucket %q", bucketName) + return reconcile.Result{}, notification, errors.Wrapf(err, "failed to get object store from ObjectBucket %q", bucketName) } if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { - return reconcile.Result{}, err + return reconcile.Result{}, notification, err } err = createNotificationFunc( @@ -196,11 +201,12 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile notification, ) if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName) + return reconcile.Result{}, notification, errors.Wrapf(err, "failed to provision notification for ObjectBucketClaims %q", bucketName) } + logger.Infof("provisioned CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName) } - return reconcile.Result{}, nil + return reconcile.Result{}, notification, nil } func getCephObjectStoreName(ob bktv1alpha1.ObjectBucket) (types.NamespacedName, error) { diff --git a/pkg/operator/ceph/object/notification/controller_test.go b/pkg/operator/ceph/object/notification/controller_test.go index 5c618a981ee19..501bf7e379eac 100644 --- a/pkg/operator/ceph/object/notification/controller_test.go +++ b/pkg/operator/ceph/object/notification/controller_test.go @@ -20,7 +20,9 @@ package notification import ( "context" "os" + "strings" "testing" + "time" "github.com/coreos/pkg/capnslog" bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" @@ -38,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -55,11 +58,22 @@ var ( multipleCreateBucketName = "multi-create" multipleDeleteBucketName = "multi-delete" multipleBothBucketName = "multi-both" + bnName = types.NamespacedName{Name: testNotificationName, Namespace: testNamespace} + startEvent = string(cephv1.ReconcileStarted) + finishedEvent = string(cephv1.ReconcileSucceeded) + failedEvent = string(cephv1.ReconcileFailed) ) -var getWasInvoked bool -var createdNotifications []string -var deletedNotifications []string +// global variables used inside mockSetup +var ( + testCtx = context.TODO() + testContext *clusterd.Context + testScheme = scheme.Scheme + testRecorder = record.NewFakeRecorder(256) + getWasInvoked = false + createdNotifications []string + deletedNotifications []string +) func resetValues() { getWasInvoked = false @@ -74,7 +88,52 @@ func mockCleanup() { deleteNotificationFunc = deleteNotification } -func mockSetup() { +func mockSetup(t *testing.T) { + // set log level + capnslog.SetGlobalLogLevel(capnslog.DEBUG) + os.Setenv("ROOK_LOG_LEVEL", "DEBUG") + + // create clients + testContext = &clusterd.Context{ + Executor: &exectest.MockExecutor{}, + RookClientset: rookclient.NewSimpleClientset(), + Clientset: test.New(t, 3), + } + + // create scheme + testScheme.AddKnownTypes( + cephv1.SchemeGroupVersion, + &cephv1.CephBucketNotification{}, + &cephv1.CephBucketNotificationList{}, + &cephv1.CephBucketTopic{}, + &cephv1.CephBucketTopicList{}, + &cephv1.CephCluster{}, + &cephv1.CephClusterList{}, + &bktv1alpha1.ObjectBucketClaim{}, + &bktv1alpha1.ObjectBucketClaimList{}, + &bktv1alpha1.ObjectBucket{}, + &bktv1alpha1.ObjectBucketList{}, + ) + + // create secrets + secrets := map[string][]byte{ + "fsid": []byte("name"), + "mon-secret": []byte("monsecret"), + "admin-secret": []byte("adminsecret"), + } + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rook-ceph-mon", + Namespace: testNamespace, + }, + Data: secrets, + Type: k8sutil.RookType, + } + + _, err := testContext.Clientset.CoreV1().Secrets(testNamespace).Create(testCtx, secret, metav1.CreateOptions{}) + assert.NoError(t, err) + + // test mocks createNotificationFunc = func(p provisioner, bucket *bktv1alpha1.ObjectBucket, topicARN string, notification *cephv1.CephBucketNotification) error { createdNotifications = append(createdNotifications, notification.Name) return nil @@ -99,12 +158,55 @@ func mockSetup() { } } +func testReconciler(objects []runtime.Object, notificationName string) (reconcile.Result, error) { + defer func() { testRecorder.Events <- "END" }() + cl := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(objects...).Build() + r := &ReconcileNotifications{client: cl, context: testContext, opManagerContext: testCtx, recorder: testRecorder} + testRequest := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: notificationName, + Namespace: testNamespace, + }, + } + return r.Reconcile(testCtx, testRequest) +} + +func testOBCLabelReconciler(objects []runtime.Object, bucketName string) (reconcile.Result, error) { + defer func() { testRecorder.Events <- "END" }() + cl := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(objects...).Build() + r := &ReconcileOBCLabels{client: cl, context: testContext, opManagerContext: testCtx, recorder: testRecorder} + testRequest := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: bucketName, + Namespace: testNamespace, + }, + } + return r.Reconcile(testCtx, testRequest) +} + +func verifyEvents(t *testing.T, expectedEvents []string) { + expectedEvents = append(expectedEvents, "END") + for _, expectedEvent := range expectedEvents { + select { + case event := <-testRecorder.Events: + if expectedEvent != "END" { + splitEvent := strings.Split(event, " ") + // the event message must have at least 3 parts + assert.GreaterOrEqual(t, len(splitEvent), 3) + // the type of event (2nd part) must match + assert.Equal(t, splitEvent[1], expectedEvent) + } else { + assert.Equal(t, expectedEvent, event) + } + case <-time.After(1 * time.Second): + assert.Failf(t, "missing event", "missing event: \"%s\"", expectedEvent) + } + } +} + func TestCephBucketNotificationController(t *testing.T) { - mockSetup() + mockSetup(t) defer mockCleanup() - ctx := context.TODO() - capnslog.SetGlobalLogLevel(capnslog.DEBUG) - os.Setenv("ROOK_LOG_LEVEL", "DEBUG") bucketTopic := &cephv1.CephBucketTopic{ ObjectMeta: metav1.ObjectMeta{ @@ -149,48 +251,6 @@ func TestCephBucketNotificationController(t *testing.T) { }, }, } - req := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: testNotificationName, - Namespace: testNamespace, - }, - } - - s := scheme.Scheme - s.AddKnownTypes( - cephv1.SchemeGroupVersion, - &cephv1.CephBucketNotification{}, - &cephv1.CephBucketNotificationList{}, - &cephv1.CephBucketTopic{}, - &cephv1.CephBucketTopicList{}, - &cephv1.CephCluster{}, - &cephv1.CephClusterList{}, - &bktv1alpha1.ObjectBucketClaim{}, - &bktv1alpha1.ObjectBucketClaimList{}, - ) - - c := &clusterd.Context{ - Executor: &exectest.MockExecutor{}, - RookClientset: rookclient.NewSimpleClientset(), - Clientset: test.New(t, 3), - } - - secrets := map[string][]byte{ - "fsid": []byte("name"), - "mon-secret": []byte("monsecret"), - "admin-secret": []byte("adminsecret"), - } - secret := &v1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rook-ceph-mon", - Namespace: testNamespace, - }, - Data: secrets, - Type: k8sutil.RookType, - } - - _, err := c.Clientset.CoreV1().Secrets(testNamespace).Create(ctx, secret, metav1.CreateOptions{}) - assert.NoError(t, err) t.Run("create notification configuration without a topic", func(t *testing.T) { // Objects to track in the fake client. @@ -198,18 +258,13 @@ func TestCephBucketNotificationController(t *testing.T) { bucketNotification, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileNotifications{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testReconciler(objects, testNotificationName) // provisioning requeued because the topic is not configured assert.NoError(t, err) assert.True(t, res.Requeue) - // notification configuration is set - err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) assert.Equal(t, 0, len(createdNotifications)) + verifyEvents(t, []string{startEvent, failedEvent}) }) t.Run("create notification and topic configuration when there is no cluster", func(t *testing.T) { @@ -218,18 +273,13 @@ func TestCephBucketNotificationController(t *testing.T) { bucketTopic, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileNotifications{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testReconciler(objects, testNotificationName) // provisioning requeued because the cluster does not exist assert.NoError(t, err) assert.True(t, res.Requeue) - // notification configuration is set - err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) assert.Equal(t, 0, len(createdNotifications)) + verifyEvents(t, []string{startEvent, failedEvent}) }) t.Run("create notification and topic configuration cluster is not ready", func(t *testing.T) { @@ -239,18 +289,13 @@ func TestCephBucketNotificationController(t *testing.T) { cephCluster, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileNotifications{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testReconciler(objects, testNotificationName) // provisioning requeued because the cluster is not ready assert.NoError(t, err) assert.True(t, res.Requeue) - // notification configuration is set - err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) assert.NoError(t, err, bucketNotification) assert.Equal(t, 0, len(createdNotifications)) + verifyEvents(t, []string{startEvent, failedEvent}) }) t.Run("create notification and topic configuration when topic is not yet provisioned", func(t *testing.T) { @@ -262,18 +307,12 @@ func TestCephBucketNotificationController(t *testing.T) { cephCluster, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileNotifications{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testReconciler(objects, testNotificationName) // provisioning requeued because the topic is not provisioned on the RGW assert.NoError(t, err) assert.True(t, res.Requeue) - // notification configuration is set - err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) - assert.NoError(t, err, bucketNotification) assert.Equal(t, 0, len(createdNotifications)) + verifyEvents(t, []string{startEvent, failedEvent}) }) t.Run("create notification and topic configuration", func(t *testing.T) { @@ -286,26 +325,17 @@ func TestCephBucketNotificationController(t *testing.T) { cephCluster, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileNotifications{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testReconciler(objects, testNotificationName) assert.NoError(t, err) assert.False(t, res.Requeue) - // notification configuration is set - err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) - assert.NoError(t, err, bucketNotification) assert.Equal(t, 0, len(createdNotifications)) + verifyEvents(t, []string{startEvent, finishedEvent}) }) } func TestCephBucketNotificationControllerWithOBC(t *testing.T) { - mockSetup() + mockSetup(t) defer mockCleanup() - ctx := context.TODO() - capnslog.SetGlobalLogLevel(capnslog.DEBUG) - os.Setenv("ROOK_LOG_LEVEL", "DEBUG") bucketTopic := &cephv1.CephBucketTopic{ ObjectMeta: metav1.ObjectMeta{ @@ -369,50 +399,6 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) { Phase: bktv1alpha1.ObjectBucketClaimStatusPhasePending, }, } - req := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: testNotificationName, - Namespace: testNamespace, - }, - } - - s := scheme.Scheme - s.AddKnownTypes( - cephv1.SchemeGroupVersion, - &cephv1.CephBucketNotification{}, - &cephv1.CephBucketNotificationList{}, - &cephv1.CephBucketTopic{}, - &cephv1.CephBucketTopicList{}, - &cephv1.CephCluster{}, - &cephv1.CephClusterList{}, - &bktv1alpha1.ObjectBucketClaim{}, - &bktv1alpha1.ObjectBucketClaimList{}, - &bktv1alpha1.ObjectBucket{}, - &bktv1alpha1.ObjectBucketList{}, - ) - - c := &clusterd.Context{ - Executor: &exectest.MockExecutor{}, - RookClientset: rookclient.NewSimpleClientset(), - Clientset: test.New(t, 3), - } - - secrets := map[string][]byte{ - "fsid": []byte("name"), - "mon-secret": []byte("monsecret"), - "admin-secret": []byte("adminsecret"), - } - secret := &v1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rook-ceph-mon", - Namespace: testNamespace, - }, - Data: secrets, - Type: k8sutil.RookType, - } - - _, err := c.Clientset.CoreV1().Secrets(testNamespace).Create(ctx, secret, metav1.CreateOptions{}) - assert.NoError(t, err) t.Run("provision notification when OBC exists but no OB", func(t *testing.T) { objects := []runtime.Object{ @@ -422,17 +408,11 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) { obc, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileNotifications{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testReconciler(objects, testNotificationName) assert.NoError(t, err) assert.True(t, res.Requeue) - // notification configuration is set - err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) - assert.NoError(t, err, bucketNotification) assert.Equal(t, 0, len(createdNotifications)) + verifyEvents(t, []string{startEvent, failedEvent}) }) t.Run("provision notification when OB exists", func(t *testing.T) { @@ -466,17 +446,11 @@ func TestCephBucketNotificationControllerWithOBC(t *testing.T) { ob, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileNotifications{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testReconciler(objects, testNotificationName) assert.NoError(t, err) assert.False(t, res.Requeue) - // notification configuration is set - err = r.client.Get(ctx, types.NamespacedName{Name: testNotificationName, Namespace: testNamespace}, bucketNotification) - assert.NoError(t, err, bucketNotification) assert.Equal(t, 1, len(createdNotifications)) + verifyEvents(t, []string{startEvent, finishedEvent}) }) } diff --git a/pkg/operator/ceph/object/notification/obc_label_controller.go b/pkg/operator/ceph/object/notification/obc_label_controller.go index 61e229e1f40da..f4af937dd6544 100644 --- a/pkg/operator/ceph/object/notification/obc_label_controller.go +++ b/pkg/operator/ceph/object/notification/obc_label_controller.go @@ -28,9 +28,13 @@ import ( opcontroller "github.com/rook/rook/pkg/operator/ceph/controller" "github.com/rook/rook/pkg/operator/ceph/object/bucket" "github.com/rook/rook/pkg/operator/ceph/object/topic" + "github.com/rook/rook/pkg/operator/ceph/reporting" + kapiv1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -48,6 +52,7 @@ type ReconcileOBCLabels struct { client client.Client context *clusterd.Context opManagerContext context.Context + recorder record.EventRecorder } func addOBCLabelReconciler(mgr manager.Manager, r reconcile.Reconciler) error { @@ -139,7 +144,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res } bnList, err := getAllNotificationsFunc(p, &ob) if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed delete all bucket notifications from ObjectbucketClaim %q", bucketName) + return reconcile.Result{}, errors.Wrapf(err, "failed to list bucket notifications in ObjectbucketClaim %q", bucketName) } labelList := make([]string, 0) @@ -175,42 +180,46 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res return waitForRequeueIfNotificationNotDeleted, nil } // add new notifications to the list - return r.addNewNotifications(p, ob, labelList, objectStoreName, obc.Namespace) + for _, label := range labelList { + reconcileResponse, notification, err := r.addNewNotification(p, ob, label, objectStoreName, obc.Namespace) + reporting.ReportReconcileResult(logger, r.recorder, notification, reconcileResponse, err) + if err != nil { + return reconcileResponse, err + } + } + return reconcile.Result{}, nil } -func (r *ReconcileOBCLabels) addNewNotifications(p provisioner, ob bktv1alpha1.ObjectBucket, notificationList []string, objectStoreName types.NamespacedName, namespace string) (reconcile.Result, error) { - for _, labelValue := range notificationList { - // for each notification label fetch the bucket notification CRD - notification := &cephv1.CephBucketNotification{} - bnName := types.NamespacedName{Namespace: namespace, Name: labelValue} - if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil { - if kerrors.IsNotFound(err) { - logger.Infof("CephBucketNotification %q not found in %q ", bnName.Name, bnName.Namespace) - return waitForRequeueIfNotificationNotReady, nil - } - return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) +func (r *ReconcileOBCLabels) addNewNotification(p provisioner, ob bktv1alpha1.ObjectBucket, label string, objectStoreName types.NamespacedName, namespace string) (reconcile.Result, *cephv1.CephBucketNotification, error) { + // for each notification label fetch the bucket notification CRD + notification := &cephv1.CephBucketNotification{ObjectMeta: metav1.ObjectMeta{Name: label, Namespace: namespace}} + bnName := types.NamespacedName{Namespace: namespace, Name: label} + bucketName := types.NamespacedName{Name: ob.Spec.ClaimRef.Name, Namespace: namespace} + r.recorder.Eventf(notification, kapiv1.EventTypeNormal, string(cephv1.ReconcileStarted), "Started reconciling CephBucketNotification %q for ObjectBucketClaim %q", bnName, bucketName) + if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil { + if kerrors.IsNotFound(err) { + return waitForRequeueIfNotificationNotReady, notification, errors.Wrapf(err, "CephBucketNotification %q not provisioned yet", bnName) } + return reconcile.Result{}, notification, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName) + } - logger.Debugf("adding bucket notification %q in namespace %q to obc %q", bnName.Name, bnName.Namespace, ob.Spec.ClaimRef.Name) - // get the topic associated with the notification, and make sure it is provisioned - topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} - bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) - if err != nil { - logger.Infof("CephBucketTopic %q not provisioned yet in %q", topicName.Name, topicName.Namespace) - return waitForRequeueIfTopicNotReady, nil - } + // get the topic associated with the notification, and make sure it is provisioned + topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic} + bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName) + if err != nil { + return waitForRequeueIfTopicNotReady, notification, errors.Wrapf(err, "topic %q not provisioned yet", topicName) + } - if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { - return reconcile.Result{}, err - } + if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil { + return reconcile.Result{}, notification, err + } - // provision the notification - err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification) - if err != nil { - return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName) - } - logger.Infof("provisioned CephBucketNotification %q in namespace %q to obc %q", bnName.Name, bnName.Namespace, ob.Spec.ClaimRef.Name) + // provision the notification + err = createNotificationFunc(p, &ob, *bucketTopic.Status.ARN, notification) + if err != nil { + return reconcile.Result{}, notification, errors.Wrapf(err, "failed to provision notification for ObjectBucketClaims %q", bucketName) } + logger.Infof("provisioned CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName) - return reconcile.Result{}, nil + return reconcile.Result{}, notification, nil } diff --git a/pkg/operator/ceph/object/notification/obc_label_controller_test.go b/pkg/operator/ceph/object/notification/obc_label_controller_test.go index 2ca11f81f26de..1495b27b9f9b5 100644 --- a/pkg/operator/ceph/object/notification/obc_label_controller_test.go +++ b/pkg/operator/ceph/object/notification/obc_label_controller_test.go @@ -18,29 +18,17 @@ limitations under the License. package notification import ( - "context" - "os" "testing" - "github.com/coreos/pkg/capnslog" bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" - rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" - "github.com/rook/rook/pkg/operator/test" - "github.com/rook/rook/pkg/clusterd" "github.com/rook/rook/pkg/operator/ceph/object" "github.com/rook/rook/pkg/operator/k8sutil" - exectest "github.com/rook/rook/pkg/util/exec/test" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/reconcile" ) func createOBResources(name string) (*bktv1alpha1.ObjectBucketClaim, *bktv1alpha1.ObjectBucket) { @@ -107,11 +95,8 @@ func setNotificationLabels(labelList []string) map[string]string { } func TestCephBucketNotificationOBCLabelController(t *testing.T) { - mockSetup() + mockSetup(t) defer mockCleanup() - ctx := context.TODO() - capnslog.SetGlobalLogLevel(capnslog.DEBUG) - os.Setenv("ROOK_LOG_LEVEL", "DEBUG") bucketTopic := &cephv1.CephBucketTopic{ ObjectMeta: metav1.ObjectMeta{ @@ -146,54 +131,9 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { }, } - req := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: testBucketName, - Namespace: testNamespace, - }, - } - - s := scheme.Scheme - s.AddKnownTypes( - cephv1.SchemeGroupVersion, - &cephv1.CephBucketNotification{}, - &cephv1.CephBucketNotificationList{}, - &cephv1.CephBucketTopic{}, - &cephv1.CephBucketTopicList{}, - &cephv1.CephCluster{}, - &cephv1.CephClusterList{}, - &bktv1alpha1.ObjectBucketClaim{}, - &bktv1alpha1.ObjectBucketClaimList{}, - &bktv1alpha1.ObjectBucket{}, - &bktv1alpha1.ObjectBucketList{}, - ) - - c := &clusterd.Context{ - Executor: &exectest.MockExecutor{}, - RookClientset: rookclient.NewSimpleClientset(), - Clientset: test.New(t, 3), - } - - secrets := map[string][]byte{ - "fsid": []byte("name"), - "mon-secret": []byte("monsecret"), - "admin-secret": []byte("adminsecret"), - } - secret := &v1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rook-ceph-mon", - Namespace: testNamespace, - }, - Data: secrets, - Type: k8sutil.RookType, - } - obc, ob := createOBResources(testBucketName) obc.Labels = setNotificationLabels([]string{testNotificationName}) - _, err := c.Clientset.CoreV1().Secrets(testNamespace).Create(ctx, secret, metav1.CreateOptions{}) - assert.NoError(t, err) - t.Run("provision OBC with notification label with no ob", func(t *testing.T) { resetValues() objects := []runtime.Object{ @@ -201,16 +141,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { obc, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testOBCLabelReconciler(objects, testBucketName) assert.NoError(t, err) assert.True(t, res.Requeue) assert.False(t, getWasInvoked) assert.Equal(t, 0, len(createdNotifications)) assert.Equal(t, 0, len(deletedNotifications)) + verifyEvents(t, []string{}) }) t.Run("provision OBC with notification label with not ready ob", func(t *testing.T) { @@ -221,16 +158,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { ob, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testOBCLabelReconciler(objects, testBucketName) assert.NoError(t, err) assert.True(t, res.Requeue) assert.False(t, getWasInvoked) assert.Equal(t, 0, len(createdNotifications)) assert.Equal(t, 0, len(deletedNotifications)) + verifyEvents(t, []string{}) }) obc.Spec.ObjectBucketName = testBucketName @@ -244,16 +178,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { ob, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) - assert.NoError(t, err) + res, err := testOBCLabelReconciler(objects, testBucketName) + assert.Error(t, err) assert.True(t, res.Requeue) assert.True(t, getWasInvoked) assert.Equal(t, 0, len(createdNotifications)) assert.Equal(t, 0, len(deletedNotifications)) + verifyEvents(t, []string{startEvent, failedEvent}) }) bucketNotification := createBucketNotification(testNotificationName) @@ -266,16 +197,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { bucketNotification, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) - assert.NoError(t, err) + res, err := testOBCLabelReconciler(objects, testBucketName) + assert.Error(t, err) assert.True(t, res.Requeue) assert.True(t, getWasInvoked) assert.Equal(t, 0, len(createdNotifications)) assert.Equal(t, 0, len(deletedNotifications)) + verifyEvents(t, []string{startEvent, failedEvent}) }) t.Run("provision OBC with notification label", func(t *testing.T) { @@ -288,11 +216,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { bucketTopic, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testOBCLabelReconciler(objects, testBucketName) assert.NoError(t, err) assert.False(t, res.Requeue) assert.True(t, getWasInvoked) @@ -307,7 +231,6 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { noChangeOBC.Labels = setNotificationLabels([]string{testNotificationName}) noChangeOBC.Spec.ObjectBucketName = noChangeBucketName noChangeOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound - req.NamespacedName.Name = noChangeBucketName objects := []runtime.Object{ cephCluster, noChangeOBC, @@ -316,11 +239,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { bucketTopic, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testOBCLabelReconciler(objects, noChangeBucketName) assert.NoError(t, err) assert.False(t, res.Requeue) assert.True(t, getWasInvoked) @@ -335,18 +254,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { deleteOBC.Spec.GenerateBucketName = deleteBucketName deleteOBC.Spec.ObjectBucketName = deleteBucketName deleteOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound - req.NamespacedName.Name = deleteBucketName objects := []runtime.Object{ cephCluster, deleteOBC, deleteOB, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testOBCLabelReconciler(objects, deleteBucketName) assert.NoError(t, err) assert.False(t, res.Requeue) assert.True(t, getWasInvoked) @@ -364,7 +278,6 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { multipleCreateOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound bucketNotification1 := createBucketNotification(testNotificationName + "-1") bucketNotification2 := createBucketNotification(testNotificationName + "-2") - req.NamespacedName.Name = multipleCreateBucketName objects := []runtime.Object{ cephCluster, @@ -375,11 +288,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { bucketTopic, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testOBCLabelReconciler(objects, multipleCreateBucketName) assert.NoError(t, err) assert.False(t, res.Requeue) assert.True(t, getWasInvoked) @@ -394,18 +303,13 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { multipleDeleteOBC.Spec.GenerateBucketName = multipleDeleteBucketName multipleDeleteOBC.Spec.ObjectBucketName = multipleDeleteBucketName multipleDeleteOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound - req.NamespacedName.Name = multipleDeleteBucketName objects := []runtime.Object{ cephCluster, multipleDeleteOBC, multipleDeleteOB, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testOBCLabelReconciler(objects, multipleDeleteBucketName) assert.NoError(t, err) assert.False(t, res.Requeue) assert.True(t, getWasInvoked) @@ -422,7 +326,6 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { multipleBothOBC.Status.Phase = bktv1alpha1.ObjectBucketClaimStatusPhaseBound bucketNotification1 := createBucketNotification(testNotificationName + "-1") bucketNotification2 := createBucketNotification(testNotificationName + "-2") - req.NamespacedName.Name = multipleBothBucketName objects := []runtime.Object{ cephCluster, @@ -433,11 +336,7 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { bucketTopic, } - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() - - r := &ReconcileOBCLabels{client: cl, context: c, opManagerContext: ctx} - - res, err := r.Reconcile(ctx, req) + res, err := testOBCLabelReconciler(objects, multipleBothBucketName) assert.NoError(t, err) assert.False(t, res.Requeue) assert.True(t, getWasInvoked) @@ -446,5 +345,6 @@ func TestCephBucketNotificationOBCLabelController(t *testing.T) { assert.Equal(t, 2, len(deletedNotifications)) assert.ElementsMatch(t, deletedNotifications, []string{multipleDeleteBucketName + testNotificationName + "-1", multipleDeleteBucketName + testNotificationName + "-2"}) + verifyEvents(t, []string{startEvent, finishedEvent}) }) }