Skip to content

Commit

Permalink
Event, source, handler, predicates: Use generics
Browse files Browse the repository at this point in the history
This change add generic versions of event, handler and predicates along
the existing ones, prefixed with `Typed`. The existing ones are left in
place under the same signature.

The `source` constructors are changed to be generic.
  • Loading branch information
alvaroaleman committed Apr 17, 2024
1 parent 9931919 commit b71b256
Show file tree
Hide file tree
Showing 20 changed files with 501 additions and 297 deletions.
4 changes: 2 additions & 2 deletions examples/builtins/main.go
Expand Up @@ -59,14 +59,14 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.TypedEnqueueRequestForObject[*appsv1.ReplicaSet]{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{},
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
handler.TypedEnqueueRequestForOwner[*corev1.Pod](mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
45 changes: 23 additions & 22 deletions pkg/builder/controller.go
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -56,6 +55,7 @@ const (
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
rawSources []source.Source
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
Expand Down Expand Up @@ -123,7 +123,8 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
obj client.Object
handler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -134,13 +135,17 @@ type WatchesInput struct {
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{}
input := WatchesInput{
obj: object,
handler: eventHandler,
}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
src := source.Kind(blder.mgr.GetCache(), object, eventHandler, input.predicates...)

return blder.WatchesRawSource(src, opts...)
blder.watchesInput = append(blder.watchesInput, input)

return blder
}

// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
Expand Down Expand Up @@ -180,13 +185,9 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
blder.rawSources = append(blder.rawSources, src)

blder.watchesInput = append(blder.watchesInput, input)
return blder
}

Expand Down Expand Up @@ -312,22 +313,22 @@ func (blder *Builder) doWatch() error {
}

// Do the watch requests
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
allPredicates := append(allPredicates, w.predicates...)
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc
srcKind.Predicates = append(srcKind.Predicates, allPredicates...)
projected, err := blder.project(w.obj, w.objectProjection)
if err != nil {
return fmt.Errorf("failed to project for %T: %w", w.obj, err)
}
if err := blder.ctrl.Watch(w.src); err != nil {
allPredicates := append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
return err
}
}
for _, src := range blder.rawSources {
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_integration_test.go
Expand Up @@ -66,12 +66,12 @@ var _ = Describe("controller", func() {
By("Watching Resources")
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
),
)
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan, Handler: &handler.EnqueueRequestForObject{}}
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/example_test.go
Expand Up @@ -71,7 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -108,7 +108,7 @@ func ExampleController_unstructured() {
Version: "v1",
})
// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.TypedEnqueueRequestForObject[*unstructured.Unstructured]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
os.Exit(1)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
51 changes: 34 additions & 17 deletions pkg/event/event.go
Expand Up @@ -18,38 +18,55 @@ package event

import "sigs.k8s.io/controller-runtime/pkg/client"

// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by a handler.EventHandler.
type CreateEvent = TypedCreateEvent[client.Object]

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent = TypedUpdateEvent[client.Object]

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type CreateEvent struct {
type DeleteEvent = TypedDeleteEvent[client.Object]

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent = TypedGenericEvent[client.Object]

// TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedCreateEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent struct {
// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedUpdateEvent[T any] struct {
// ObjectOld is the object from the event
ObjectOld client.Object
ObjectOld T

// ObjectNew is the object from the event
ObjectNew client.Object
ObjectNew T
}

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type DeleteEvent struct {
// TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedDeleteEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T

// DeleteStateUnknown is true if the Delete event was missed but we identified the object
// as having been deleted.
DeleteStateUnknown bool
}

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent struct {
// TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.TypedEventHandler.
type TypedGenericEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}
41 changes: 30 additions & 11 deletions pkg/handler/enqueue.go
Expand Up @@ -18,9 +18,11 @@ package handler

import (
"context"
"reflect"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -33,13 +35,18 @@ type empty struct{}
var _ EventHandler = &EnqueueRequestForObject{}

// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
type EnqueueRequestForObject struct{}
type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object]

// TypedEnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.TypedEnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
type TypedEnqueueRequestForObject[T client.Object] struct{}

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
Expand All @@ -50,14 +57,14 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv
}

// Update implements EventHandler.
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) {
switch {
case evt.ObjectNew != nil:
case !isNil(evt.ObjectNew):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
case evt.ObjectOld != nil:
case !isNil(evt.ObjectOld):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
Expand All @@ -68,8 +75,8 @@ func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEv
}

// Delete implements EventHandler.
func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
Expand All @@ -80,8 +87,8 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv
}

// Generic implements EventHandler.
func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
return
}
Expand All @@ -90,3 +97,15 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
Namespace: evt.Object.GetNamespace(),
}})
}

func isNil(arg any) bool {
if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr ||
v.Kind() == reflect.Interface ||
v.Kind() == reflect.Slice ||
v.Kind() == reflect.Map ||
v.Kind() == reflect.Chan ||
v.Kind() == reflect.Func) && v.IsNil()) {
return true
}
return false
}

0 comments on commit b71b256

Please sign in to comment.