Skip to content

Commit

Permalink
Merge pull request #9457 from leseb/fix-9452
Browse files Browse the repository at this point in the history
core: disallow multiple clusters in the same namespace
  • Loading branch information
leseb committed Dec 20, 2021
2 parents 9dc41f5 + da76a77 commit fffb862
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/controller.go
Expand Up @@ -140,7 +140,7 @@ func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reco
},
},
&handler.EnqueueRequestForObject{},
watchControllerPredicate())
watchControllerPredicate(opManagerContext, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/operator/ceph/cluster/predicate.go
Expand Up @@ -18,6 +18,8 @@ limitations under the License.
package cluster

import (
"context"

"github.com/google/go-cmp/cmp"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
Expand Down Expand Up @@ -105,9 +107,12 @@ func isHotPlugCM(obj runtime.Object) bool {
return false
}

func watchControllerPredicate() predicate.Funcs {
func watchControllerPredicate(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
if controller.DuplicateCephClusters(ctx, c, e.Object, true) {
return false
}
logger.Debug("create event from a CR")
return true
},
Expand All @@ -116,6 +121,13 @@ func watchControllerPredicate() predicate.Funcs {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
// We still need to check on update event since the user must delete the additional CR
// Until this is done, the user can still update the CR and the operator will reconcile
// This should not happen
if controller.DuplicateCephClusters(ctx, c, e.ObjectOld, true) {
return false
}

// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })

Expand Down
36 changes: 36 additions & 0 deletions pkg/operator/ceph/controller/predicate.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"encoding/json"
"os"
"reflect"
Expand Down Expand Up @@ -699,3 +700,38 @@ func ReloadManager() {
p, _ := os.FindProcess(os.Getpid())
_ = p.Signal(syscall.SIGHUP)
}

// DuplicateCephClusters determine whether a similar object exists in the same namespace
// mainly used for the CephCluster which we only support a single instance per namespace
func DuplicateCephClusters(ctx context.Context, c client.Client, object client.Object, log bool) bool {
objectType, ok := object.(*cephv1.CephCluster)
if !ok {
logger.Errorf("expected type CephCluster but found %T", objectType)
return false
}

cephClusterList := &cephv1.CephClusterList{}
listOpts := []client.ListOption{
client.InNamespace(object.GetNamespace()),
}
err := c.List(ctx, cephClusterList, listOpts...)
if err != nil {
logger.Errorf("failed to list ceph clusters, assuming there is none, not reconciling. %v", err)
return true
}

// This check is needed when the operator is down and a cluster was created
if len(cephClusterList.Items) > 1 {
// Since multiple predicate are using this function we don't want all of them to log the
// same message, so one predicate can log and the other cannot
if log {
logger.Errorf("found more than one ceph cluster in namespace %q. not reconciling. only one ceph cluster per namespace.", object.GetNamespace())
for _, cluster := range cephClusterList.Items {
logger.Errorf("found ceph cluster %q in namespace %q", cluster.Name, cluster.Namespace)
}
}
return true
}

return false
}
58 changes: 58 additions & 0 deletions pkg/operator/ceph/controller/predicate_test.go
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package controller

import (
"context"
"fmt"
"testing"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/operator/ceph/config"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var (
Expand Down Expand Up @@ -219,3 +223,57 @@ func TestIsDoNotReconcile(t *testing.T) {
b = IsDoNotReconcile(l)
assert.True(t, b)
}

func TestDuplicateCephClusters(t *testing.T) {
ctx := context.TODO()
namespace := "rook-ceph"
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-a",
Namespace: namespace,
},
}
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &cephv1.CephClusterList{})

t.Run("success - only one ceph cluster", func(t *testing.T) {
object := []runtime.Object{
cephCluster,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.False(t, DuplicateCephClusters(ctx, cl, cephCluster, false))
})

t.Run("success - we have more than one cluster but they are in different namespaces", func(t *testing.T) {
dup := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: "anotherns",
},
}
object := []runtime.Object{
cephCluster,
dup,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.False(t, DuplicateCephClusters(ctx, cl, dup, true))
})

t.Run("fail - we have more than one cluster in the same namespace", func(t *testing.T) {
dup := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: namespace,
},
}
object := []runtime.Object{
cephCluster,
dup,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.True(t, DuplicateCephClusters(ctx, cl, dup, true))
})
}
8 changes: 4 additions & 4 deletions pkg/operator/ceph/csi/controller.go
Expand Up @@ -54,7 +54,7 @@ type ReconcileCSI struct {
// Add creates a new Ceph CSI Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) error {
return add(mgr, newReconciler(mgr, context, opManagerContext, opConfig))
return add(opManagerContext, mgr, newReconciler(mgr, context, opManagerContext, opConfig))
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -67,7 +67,7 @@ func newReconciler(mgr manager.Manager, context *clusterd.Context, opManagerCont
}
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(ctx context.Context, mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
Expand All @@ -77,14 +77,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

// Watch for ConfigMap (operator config)
err = c.Watch(&source.Kind{
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}

// Watch for CephCluster
err = c.Watch(&source.Kind{
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/operator/ceph/csi/predicate.go
Expand Up @@ -17,19 +17,22 @@ limitations under the License.
package csi

import (
"context"
"regexp"

"github.com/google/go-cmp/cmp"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/operator/ceph/controller"
opcontroller "github.com/rook/rook/pkg/operator/ceph/controller"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// predicateController is the predicate function to trigger reconcile on operator configuration cm change
func predicateController() predicate.Funcs {
func predicateController(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
// if the operator configuration file is created we want to reconcile
Expand All @@ -41,6 +44,10 @@ func predicateController() predicate.Funcs {

// If a Ceph Cluster is created we want to reconcile the csi driver
if cephCluster, ok := e.Object.(*cephv1.CephCluster); ok {
// If there are more than one ceph cluster in the same namespace do not reconcile
if controller.DuplicateCephClusters(ctx, c, e.Object, false) {
return false
}
// This allows us to avoid a double reconcile of the CSI controller if this is not
// the first generation of the CephCluster. So only return true if this is the very
// first instance of the CephCluster
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/disruption/clusterdisruption/add.go
Expand Up @@ -60,7 +60,7 @@ func Add(mgr manager.Manager, context *controllerconfig.Context) error {

cephClusterPredicate := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
logger.Info("create event from ceph cluster CR")
logger.Debug("create event from ceph cluster CR")
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
Expand Down
8 changes: 4 additions & 4 deletions pkg/operator/ceph/object/bucket/controller.go
Expand Up @@ -54,7 +54,7 @@ type ReconcileBucket struct {
// Add creates a new Ceph CSI Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) error {
return add(mgr, newReconciler(mgr, context, opManagerContext, opConfig))
return add(opManagerContext, mgr, newReconciler(mgr, context, opManagerContext, opConfig))
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -67,7 +67,7 @@ func newReconciler(mgr manager.Manager, context *clusterd.Context, opManagerCont
}
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(ctx context.Context, mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
Expand All @@ -77,14 +77,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

// Watch for ConfigMap (operator config)
err = c.Watch(&source.Kind{
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}

// Watch for CephCluster
err = c.Watch(&source.Kind{
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/operator/ceph/object/bucket/predicate.go
Expand Up @@ -17,17 +17,20 @@ limitations under the License.
package bucket

import (
"context"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/operator/ceph/controller"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const rookOBCWatchOperatorNamespace = "ROOK_OBC_WATCH_OPERATOR_NAMESPACE"

// predicateController is the predicate function to trigger reconcile on operator configuration cm change
func predicateController() predicate.Funcs {
func predicateController(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
// if the operator configuration file is created we want to reconcile
Expand All @@ -39,8 +42,8 @@ func predicateController() predicate.Funcs {

// If a Ceph Cluster is created we want to reconcile the bucket provisioner
if _, ok := e.Object.(*cephv1.CephCluster); ok {
// Always return true, so when the controller starts we reconcile too. We don't get
return true
// If there are more than one ceph cluster in the same namespace do not reconcile
return !controller.DuplicateCephClusters(ctx, c, e.Object, false)
}

return false
Expand Down

0 comments on commit fffb862

Please sign in to comment.