Skip to content

Commit

Permalink
core: disallow multiple clusters in the same namespace
Browse files Browse the repository at this point in the history
Rook does not support running multiple clusters in the same namespace,
so the operator should not reconcile if a new cluster is added.
The scenario where a cluster is added while the operator is down is also
handled. CR updates are also handled.
When the operator detects more than one cluster it will refuse to
reconcile the CephCluster, and child CRDs will block too until the
operator is ready.
The user must remove one of the clusters before can continue to perform
any reconcile.

Closes: #9452
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb committed Dec 17, 2021
1 parent 52ededd commit da76a77
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/controller.go
Expand Up @@ -139,7 +139,7 @@ func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reco
},
},
&handler.EnqueueRequestForObject{},
watchControllerPredicate())
watchControllerPredicate(opManagerContext, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/operator/ceph/cluster/predicate.go
Expand Up @@ -18,6 +18,8 @@ limitations under the License.
package cluster

import (
"context"

"github.com/google/go-cmp/cmp"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/clusterd"
Expand Down Expand Up @@ -105,9 +107,12 @@ func isHotPlugCM(obj runtime.Object) bool {
return false
}

func watchControllerPredicate() predicate.Funcs {
func watchControllerPredicate(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
if controller.DuplicateCephClusters(ctx, c, e.Object, true) {
return false
}
logger.Debug("create event from a CR")
return true
},
Expand All @@ -116,6 +121,13 @@ func watchControllerPredicate() predicate.Funcs {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
// We still need to check on update event since the user must delete the additional CR
// Until this is done, the user can still update the CR and the operator will reconcile
// This should not happen
if controller.DuplicateCephClusters(ctx, c, e.ObjectOld, true) {
return false
}

// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })

Expand Down
36 changes: 36 additions & 0 deletions pkg/operator/ceph/controller/predicate.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"encoding/json"
"os"
"reflect"
Expand Down Expand Up @@ -699,3 +700,38 @@ func ReloadManager() {
p, _ := os.FindProcess(os.Getpid())
_ = p.Signal(syscall.SIGHUP)
}

// DuplicateCephClusters determine whether a similar object exists in the same namespace
// mainly used for the CephCluster which we only support a single instance per namespace
func DuplicateCephClusters(ctx context.Context, c client.Client, object client.Object, log bool) bool {
objectType, ok := object.(*cephv1.CephCluster)
if !ok {
logger.Errorf("expected type CephCluster but found %T", objectType)
return false
}

cephClusterList := &cephv1.CephClusterList{}
listOpts := []client.ListOption{
client.InNamespace(object.GetNamespace()),
}
err := c.List(ctx, cephClusterList, listOpts...)
if err != nil {
logger.Errorf("failed to list ceph clusters, assuming there is none, not reconciling. %v", err)
return true
}

// This check is needed when the operator is down and a cluster was created
if len(cephClusterList.Items) > 1 {
// Since multiple predicate are using this function we don't want all of them to log the
// same message, so one predicate can log and the other cannot
if log {
logger.Errorf("found more than one ceph cluster in namespace %q. not reconciling. only one ceph cluster per namespace.", object.GetNamespace())
for _, cluster := range cephClusterList.Items {
logger.Errorf("found ceph cluster %q in namespace %q", cluster.Name, cluster.Namespace)
}
}
return true
}

return false
}
58 changes: 58 additions & 0 deletions pkg/operator/ceph/controller/predicate_test.go
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package controller

import (
"context"
"fmt"
"testing"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/client/clientset/versioned/scheme"
"github.com/rook/rook/pkg/operator/ceph/config"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var (
Expand Down Expand Up @@ -215,3 +219,57 @@ func TestIsDoNotReconcile(t *testing.T) {
b = IsDoNotReconcile(l)
assert.True(t, b)
}

func TestDuplicateCephClusters(t *testing.T) {
ctx := context.TODO()
namespace := "rook-ceph"
cephCluster := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-a",
Namespace: namespace,
},
}
s := scheme.Scheme
s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephCluster{}, &cephv1.CephClusterList{})

t.Run("success - only one ceph cluster", func(t *testing.T) {
object := []runtime.Object{
cephCluster,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.False(t, DuplicateCephClusters(ctx, cl, cephCluster, false))
})

t.Run("success - we have more than one cluster but they are in different namespaces", func(t *testing.T) {
dup := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: "anotherns",
},
}
object := []runtime.Object{
cephCluster,
dup,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.False(t, DuplicateCephClusters(ctx, cl, dup, true))
})

t.Run("fail - we have more than one cluster in the same namespace", func(t *testing.T) {
dup := &cephv1.CephCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: namespace,
},
}
object := []runtime.Object{
cephCluster,
dup,
}
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build()
assert.True(t, DuplicateCephClusters(ctx, cl, dup, true))
})
}
8 changes: 4 additions & 4 deletions pkg/operator/ceph/csi/controller.go
Expand Up @@ -54,7 +54,7 @@ type ReconcileCSI struct {
// Add creates a new Ceph CSI Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) error {
return add(mgr, newReconciler(mgr, context, opManagerContext, opConfig))
return add(opManagerContext, mgr, newReconciler(mgr, context, opManagerContext, opConfig))
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -67,7 +67,7 @@ func newReconciler(mgr manager.Manager, context *clusterd.Context, opManagerCont
}
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(ctx context.Context, mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
Expand All @@ -77,14 +77,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

// Watch for ConfigMap (operator config)
err = c.Watch(&source.Kind{
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}

// Watch for CephCluster
err = c.Watch(&source.Kind{
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/operator/ceph/csi/predicate.go
Expand Up @@ -17,19 +17,22 @@ limitations under the License.
package csi

import (
"context"
"regexp"

"github.com/google/go-cmp/cmp"
cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/operator/ceph/controller"
opcontroller "github.com/rook/rook/pkg/operator/ceph/controller"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// predicateController is the predicate function to trigger reconcile on operator configuration cm change
func predicateController() predicate.Funcs {
func predicateController(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
// if the operator configuration file is created we want to reconcile
Expand All @@ -41,6 +44,10 @@ func predicateController() predicate.Funcs {

// If a Ceph Cluster is created we want to reconcile the csi driver
if cephCluster, ok := e.Object.(*cephv1.CephCluster); ok {
// If there are more than one ceph cluster in the same namespace do not reconcile
if controller.DuplicateCephClusters(ctx, c, e.Object, false) {
return false
}
// This allows us to avoid a double reconcile of the CSI controller if this is not
// the first generation of the CephCluster. So only return true if this is the very
// first instance of the CephCluster
Expand Down
8 changes: 4 additions & 4 deletions pkg/operator/ceph/object/bucket/controller.go
Expand Up @@ -54,7 +54,7 @@ type ReconcileBucket struct {
// Add creates a new Ceph CSI Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, context *clusterd.Context, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) error {
return add(mgr, newReconciler(mgr, context, opManagerContext, opConfig))
return add(opManagerContext, mgr, newReconciler(mgr, context, opManagerContext, opConfig))
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -67,7 +67,7 @@ func newReconciler(mgr manager.Manager, context *clusterd.Context, opManagerCont
}
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(ctx context.Context, mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
Expand All @@ -77,14 +77,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

// Watch for ConfigMap (operator config)
err = c.Watch(&source.Kind{
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &v1.ConfigMap{TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}

// Watch for CephCluster
err = c.Watch(&source.Kind{
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController())
Type: &cephv1.CephCluster{TypeMeta: metav1.TypeMeta{Kind: "CephCluster", APIVersion: v1.SchemeGroupVersion.String()}}}, &handler.EnqueueRequestForObject{}, predicateController(ctx, mgr.GetClient()))
if err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/operator/ceph/object/bucket/predicate.go
Expand Up @@ -17,17 +17,20 @@ limitations under the License.
package bucket

import (
"context"

cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
"github.com/rook/rook/pkg/operator/ceph/controller"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const rookOBCWatchOperatorNamespace = "ROOK_OBC_WATCH_OPERATOR_NAMESPACE"

// predicateController is the predicate function to trigger reconcile on operator configuration cm change
func predicateController() predicate.Funcs {
func predicateController(ctx context.Context, c client.Client) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
// if the operator configuration file is created we want to reconcile
Expand All @@ -39,8 +42,8 @@ func predicateController() predicate.Funcs {

// If a Ceph Cluster is created we want to reconcile the bucket provisioner
if _, ok := e.Object.(*cephv1.CephCluster); ok {
// Always return true, so when the controller starts we reconcile too. We don't get
return true
// If there are more than one ceph cluster in the same namespace do not reconcile
return !controller.DuplicateCephClusters(ctx, c, e.Object, false)
}

return false
Expand Down

0 comments on commit da76a77

Please sign in to comment.