Skip to content

Commit

Permalink
Event, source, handler, predicates: Use generics
Browse files Browse the repository at this point in the history
This change add generic versions of event, handler and predicates along
the existing ones, prefixed with `Typed`. The existing ones are left in
place under the same signature.

The `source` constructors are changed to be generic.
  • Loading branch information
alvaroaleman committed Apr 20, 2024
1 parent 9931919 commit 251979f
Show file tree
Hide file tree
Showing 22 changed files with 527 additions and 330 deletions.
4 changes: 2 additions & 2 deletions examples/builtins/main.go
Expand Up @@ -59,14 +59,14 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.TypedEnqueueRequestForObject[*appsv1.ReplicaSet]{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{},
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
handler.TypedEnqueueRequestForOwner[*corev1.Pod](mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
53 changes: 28 additions & 25 deletions pkg/builder/controller.go
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -56,6 +55,7 @@ const (
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
rawSources []source.Source
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
Expand Down Expand Up @@ -123,7 +123,8 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
obj client.Object
handler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -132,15 +133,19 @@ type WatchesInput struct {
// update events by *reconciling the object* with the given EventHandler.
//
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
// WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{}
input := WatchesInput{
obj: object,
handler: eventHandler,
}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
src := source.Kind(blder.mgr.GetCache(), object, eventHandler, input.predicates...)

return blder.WatchesRawSource(src, opts...)
blder.watchesInput = append(blder.watchesInput, input)

return blder
}

// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
Expand Down Expand Up @@ -180,13 +185,11 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
//
// WatchesRawSource does not respect predicates configured through WithEventFilter.
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
blder.rawSources = append(blder.rawSources, src)

blder.watchesInput = append(blder.watchesInput, input)
return blder
}

Expand Down Expand Up @@ -312,22 +315,22 @@ func (blder *Builder) doWatch() error {
}

// Do the watch requests
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
allPredicates := append(allPredicates, w.predicates...)
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc
srcKind.Predicates = append(srcKind.Predicates, allPredicates...)
projected, err := blder.project(w.obj, w.objectProjection)
if err != nil {
return fmt.Errorf("failed to project for %T: %w", w.obj, err)
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
return err
}
if err := blder.ctrl.Watch(w.src); err != nil {
}
for _, src := range blder.rawSources {
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/builder/controller_test.go
Expand Up @@ -146,7 +146,7 @@ var _ = Describe("application", func() {
instance, err := ControllerManagedBy(m).
Named("my_controller").
Build(noop)
Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")))
Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")))
Expect(instance).To(BeNil())
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_integration_test.go
Expand Up @@ -66,12 +66,12 @@ var _ = Describe("controller", func() {
By("Watching Resources")
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
),
)
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan, Handler: &handler.EnqueueRequestForObject{}}
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/example_test.go
Expand Up @@ -71,7 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -108,7 +108,7 @@ func ExampleController_unstructured() {
Version: "v1",
})
// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.TypedEnqueueRequestForObject[*unstructured.Unstructured]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
os.Exit(1)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
51 changes: 34 additions & 17 deletions pkg/event/event.go
Expand Up @@ -18,38 +18,55 @@ package event

import "sigs.k8s.io/controller-runtime/pkg/client"

// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by a handler.EventHandler.
type CreateEvent = TypedCreateEvent[client.Object]

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent = TypedUpdateEvent[client.Object]

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type CreateEvent struct {
type DeleteEvent = TypedDeleteEvent[client.Object]

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent = TypedGenericEvent[client.Object]

// TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedCreateEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent struct {
// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedUpdateEvent[T any] struct {
// ObjectOld is the object from the event
ObjectOld client.Object
ObjectOld T

// ObjectNew is the object from the event
ObjectNew client.Object
ObjectNew T
}

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type DeleteEvent struct {
// TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedDeleteEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T

// DeleteStateUnknown is true if the Delete event was missed but we identified the object
// as having been deleted.
DeleteStateUnknown bool
}

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent struct {
// TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.TypedEventHandler.
type TypedGenericEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}
41 changes: 30 additions & 11 deletions pkg/handler/enqueue.go
Expand Up @@ -18,9 +18,11 @@ package handler

import (
"context"
"reflect"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -33,13 +35,18 @@ type empty struct{}
var _ EventHandler = &EnqueueRequestForObject{}

// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
type EnqueueRequestForObject struct{}
type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object]

// TypedEnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.TypedEnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
type TypedEnqueueRequestForObject[T client.Object] struct{}

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
Expand All @@ -50,14 +57,14 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv
}

// Update implements EventHandler.
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) {
switch {
case evt.ObjectNew != nil:
case !isNil(evt.ObjectNew):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
case evt.ObjectOld != nil:
case !isNil(evt.ObjectOld):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
Expand All @@ -68,8 +75,8 @@ func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEv
}

// Delete implements EventHandler.
func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
Expand All @@ -80,8 +87,8 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv
}

// Generic implements EventHandler.
func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
return
}
Expand All @@ -90,3 +97,15 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
Namespace: evt.Object.GetNamespace(),
}})
}

func isNil(arg any) bool {
if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr ||
v.Kind() == reflect.Interface ||
v.Kind() == reflect.Slice ||
v.Kind() == reflect.Map ||
v.Kind() == reflect.Chan ||
v.Kind() == reflect.Func) && v.IsNil()) {
return true
}
return false
}

0 comments on commit 251979f

Please sign in to comment.