Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: disallow multiple clusters in the same namespace #9457

Merged
merged 2 commits into from Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/controller.go
Expand Up @@ -139,7 +139,7 @@ func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reco
},
},
&handler.EnqueueRequestForObject{},
watchControllerPredicate())
watchControllerPredicate(opManagerContext, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/operator/ceph/cluster/predicate.go
Expand Up @@ -18,6 +18,8 @@ limitations under the License.
package cluster

import (
"context"

"github.com/google/go-cmp/cmp"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
Expand Down Expand Up @@ -105,9 +107,12 @@ func isHotPlugCM(obj runtime.Object) bool {
return false
}

func watchControllerPredicate() predicate.Funcs {
func watchControllerPredicate(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
if controller.DuplicateCephClusters(ctx, c, e.Object, true) {
return false
}
logger.Debug("create event from a CR")
return true
},
Expand All @@ -116,6 +121,13 @@ func watchControllerPredicate() predicate.Funcs {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
// We still need to check on update event since the user must delete the additional CR
// Until this is done, the user can still update the CR and the operator will reconcile
// This should not happen
if controller.DuplicateCephClusters(ctx, c, e.ObjectOld, true) {
return false
}

// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })

Expand Down
36 changes: 36 additions & 0 deletions pkg/operator/ceph/controller/predicate.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"encoding/json"
"os"
"reflect"
Expand Down Expand Up @@ -699,3 +700,38 @@ func ReloadManager() {
p, _ := os.FindProcess(os.Getpid())
_ = p.Signal(syscall.SIGHUP)
}

// DuplicateCephClusters determine whether a similar object exists in the same namespace
// mainly used for the CephCluster which we only support a single instance per namespace
func DuplicateCephClusters(ctx context.Context, c client.Client, object client.Object, log bool) bool {
objectType, ok := object.(*cephv1.CephCluster)
if !ok {
logger.Errorf("expected type CephCluster but found %T", objectType)
return false
}

cephClusterList := &cephv1.CephClusterList{}
listOpts := []client.ListOption{
client.InNamespace(object.GetNamespace()),
}
err := c.List(ctx, cephClusterList, listOpts...)
if err != nil {
logger.Errorf("failed to list ceph clusters, assuming there is none, not reconciling. %v", err)
return true
}

// This check is needed when the operator is down and a cluster was created
if len(cephClusterList.Items) > 1 {
// Since multiple predicate are using this function we don't want all of them to log the
// same message, so one predicate can log and the other cannot
if log {
logger.Errorf("found more than one ceph cluster in namespace %q. not reconciling. only one ceph cluster per namespace.", object.GetNamespace())
for _, cluster := range cephClusterList.Items {
logger.Errorf("found ceph cluster %q in namespace %q", cluster.Name, cluster.Namespace)
}
}
return true
}

return false
}
58 changes: 58 additions & 0 deletions pkg/operator/ceph/controller/predicate_test.go
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package controller

import (
"context"
"fmt"
"testing"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/operator/ceph/config"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var (
Expand Down Expand Up @@ -215,3 +219,57 @@ func TestIsDoNotReconcile(t *testing.T) {
b = IsDoNotReconcile(l)
assert.True(t, b)
}

func TestDuplicateCephClusters(t *testing.T) {
ctx := context.TODO()
namespace := "rook-ceph"
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-a",
Namespace: namespace,
},
}
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &cephv1.CephClusterList{})

t.Run("success - only one ceph cluster", func(t *testing.T) {
object := []runtime.Object{
cephCluster,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.False(t, DuplicateCephClusters(ctx, cl, cephCluster, false))
})

t.Run("success - we have more than one cluster but they are in different namespaces", func(t *testing.T) {
dup := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: "anotherns",
},
}
object := []runtime.Object{
cephCluster,
dup,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.False(t, DuplicateCephClusters(ctx, cl, dup, true))
})

t.Run("fail - we have more than one cluster in the same namespace", func(t *testing.T) {
dup := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: namespace,
},
}
object := []runtime.Object{
cephCluster,
dup,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.True(t, DuplicateCephClusters(ctx, cl, dup, true))
})
}
8 changes: 4 additions & 4 deletions pkg/operator/ceph/csi/controller.go
Expand Up @@ -54,7 +54,7 @@ type ReconcileCSI struct {
// Add creates a new Ceph CSI Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) error {
return add(mgr, newReconciler(mgr, context, opManagerContext, opConfig))
return add(opManagerContext, mgr, newReconciler(mgr, context, opManagerContext, opConfig))
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -67,7 +67,7 @@ func newReconciler(mgr manager.Manager, context *clusterd.Context, opManagerCont
}
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(ctx context.Context, mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
Expand All @@ -77,14 +77,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

// Watch for ConfigMap (operator config)
err = c.Watch(&source.Kind{
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}

// Watch for CephCluster
err = c.Watch(&source.Kind{
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/operator/ceph/csi/predicate.go
Expand Up @@ -17,19 +17,22 @@ limitations under the License.
package csi

import (
"context"
"regexp"

"github.com/google/go-cmp/cmp"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/operator/ceph/controller"
opcontroller "github.com/rook/rook/pkg/operator/ceph/controller"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// predicateController is the predicate function to trigger reconcile on operator configuration cm change
func predicateController() predicate.Funcs {
func predicateController(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
// if the operator configuration file is created we want to reconcile
Expand All @@ -41,6 +44,10 @@ func predicateController() predicate.Funcs {

// If a Ceph Cluster is created we want to reconcile the csi driver
if cephCluster, ok := e.Object.(*cephv1.CephCluster); ok {
// If there are more than one ceph cluster in the same namespace do not reconcile
if controller.DuplicateCephClusters(ctx, c, e.Object, false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to skip reconcile in the csi and obc controllers in this case? It's a rare enough error that it doesn't seem necessary to check here and just keep these other controllers simple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't check, the logs will be a bit cluttered and users might skip the useful information. CSI and OBC controllers will reconcile and hide the message:

2021-12-17 09:27:02.826962 E | ceph-spec: found more than one ceph cluster in namespace "rook-ceph". not reconciling. only one ceph cluster per namespace.
2021-12-17 09:27:02.826989 E | ceph-spec: found ceph cluster "lesebb" in namespace "rook-ceph"
2021-12-17 09:27:02.826994 E | ceph-spec: found ceph cluster "rook-ceph" in namespace "rook-ceph"

return false
}
// This allows us to avoid a double reconcile of the CSI controller if this is not
// the first generation of the CephCluster. So only return true if this is the very
// first instance of the CephCluster
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/disruption/clusterdisruption/add.go
Expand Up @@ -60,7 +60,7 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {

cephClusterPredicate := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
logger.Info("create event from ceph cluster CR")
logger.Debug("create event from ceph cluster CR")
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
Expand Down
8 changes: 4 additions & 4 deletions pkg/operator/ceph/object/bucket/controller.go
Expand Up @@ -54,7 +54,7 @@ type ReconcileBucket struct {
// Add creates a new Ceph CSI Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) error {
return add(mgr, newReconciler(mgr, context, opManagerContext, opConfig))
return add(opManagerContext, mgr, newReconciler(mgr, context, opManagerContext, opConfig))
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -67,7 +67,7 @@ func newReconciler(mgr manager.Manager, context *clusterd.Context, opManagerCont
}
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(ctx context.Context, mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
Expand All @@ -77,14 +77,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

// Watch for ConfigMap (operator config)
err = c.Watch(&source.Kind{
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}

// Watch for CephCluster
err = c.Watch(&source.Kind{
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/operator/ceph/object/bucket/predicate.go
Expand Up @@ -17,17 +17,20 @@ limitations under the License.
package bucket

import (
"context"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/operator/ceph/controller"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const rookOBCWatchOperatorNamespace = "ROOK_OBC_WATCH_OPERATOR_NAMESPACE"

// predicateController is the predicate function to trigger reconcile on operator configuration cm change
func predicateController() predicate.Funcs {
func predicateController(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
// if the operator configuration file is created we want to reconcile
Expand All @@ -39,8 +42,8 @@ func predicateController() predicate.Funcs {

// If a Ceph Cluster is created we want to reconcile the bucket provisioner
if _, ok := e.Object.(*cephv1.CephCluster); ok {
// Always return true, so when the controller starts we reconcile too. We don't get
return true
// If there are more than one ceph cluster in the same namespace do not reconcile
return !controller.DuplicateCephClusters(ctx, c, e.Object, false)
}

return false
Expand Down