Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ Source, Event, Predicate, Handler: Add generics support #2783

Merged
merged 2 commits into from Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/builtins/main.go
Expand Up @@ -59,14 +59,14 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.TypedEnqueueRequestForObject[*appsv1.ReplicaSet]{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{},
handler.TypedEnqueueRequestForOwner[*corev1.Pod](mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
60 changes: 34 additions & 26 deletions pkg/builder/controller.go
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -56,6 +55,7 @@ const (
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
rawSources []source.Source
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
Expand Down Expand Up @@ -123,8 +123,8 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventHandler handler.EventHandler
obj client.Object
handler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -133,10 +133,19 @@ type WatchesInput struct {
// update events by *reconciling the object* with the given EventHandler.
//
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
// WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
src := source.Kind(blder.mgr.GetCache(), object)
return blder.WatchesRawSource(src, eventHandler, opts...)
input := WatchesInput{
obj: object,
handler: eventHandler,
}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}

blder.watchesInput = append(blder.watchesInput, input)

return blder
}

// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
Expand Down Expand Up @@ -176,13 +185,11 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventHandler: eventHandler}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
//
// WatchesRawSource does not respect predicates configured through WithEventFilter.
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
blder.rawSources = append(blder.rawSources, src)

blder.watchesInput = append(blder.watchesInput, input)
return blder
}

Expand Down Expand Up @@ -272,11 +279,11 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand All @@ -290,7 +297,6 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
opts := []handler.OwnerOption{}
if !own.matchEveryOwner {
opts = append(opts, handler.OnlyControllerOwner())
Expand All @@ -302,27 +308,29 @@ func (blder *Builder) doWatch() error {
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}

// Do the watch requests
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
}
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc
projected, err := blder.project(w.obj, w.objectProjection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here a change suggested in #2685 will fit better. Projection might not work for different sources.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Projection might not work for different sources.

This is only used to construct a source.Kind.

I think here a change suggested in #2685 will fit better.

I disagree. Both what we did previously and what is done over there is IMHO hacky and will not work after the Source was started. Deferring the creation and creating it with the right args rather than manipulating it later on is IMHO much cleaner.

if err != nil {
return fmt.Errorf("failed to project for %T: %w", w.obj, err)
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
return err
}
}
for _, src := range blder.rawSources {
if err := blder.ctrl.Watch(src); err != nil {
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/builder/controller_test.go
Expand Up @@ -146,7 +146,7 @@ var _ = Describe("application", func() {
instance, err := ControllerManagedBy(m).
Named("my_controller").
Build(noop)
Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")))
Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")))
Expect(instance).To(BeNil())
})

Expand Down
11 changes: 2 additions & 9 deletions pkg/controller/controller.go
Expand Up @@ -25,10 +25,8 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -84,13 +82,8 @@ type Controller interface {
// Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler

// Watch takes events provided by a Source and uses the EventHandler to
// enqueue reconcile.Requests in response to the events.
//
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// Watch watches the provided Source.
Watch(src source.Source) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/controller_integration_test.go
Expand Up @@ -65,12 +65,13 @@ var _ = Describe("controller", func() {

By("Watching Resources")
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
),
)
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand All @@ -101,7 +101,7 @@ var _ = Describe("controller.Controller", func() {
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(c.Watch(watch)).To(Succeed())
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/example_test.go
Expand Up @@ -71,7 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -108,7 +108,7 @@ func ExampleController_unstructured() {
Version: "v1",
})
// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.TypedEnqueueRequestForObject[*unstructured.Unstructured]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
os.Exit(1)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
51 changes: 34 additions & 17 deletions pkg/event/event.go
Expand Up @@ -18,38 +18,55 @@ package event

import "sigs.k8s.io/controller-runtime/pkg/client"

// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by a handler.EventHandler.
type CreateEvent = TypedCreateEvent[client.Object]

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent = TypedUpdateEvent[client.Object]

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type CreateEvent struct {
type DeleteEvent = TypedDeleteEvent[client.Object]

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent = TypedGenericEvent[client.Object]

// TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedCreateEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent struct {
// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedUpdateEvent[T any] struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that having a typed EventHandler here is not as convenient as to have a method which will infer the type of the object. Client-go works with 2 interfaces under the hood, passed in a method if I’m not mistaken. Can this be implemented this way but with generics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean a constructor for events and unexporting the current types? That would be a breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather leave Event structures for non generic types without any change, and introduce a new handler for generic types. I did it this way: https://github.com/Danil-Grigorev/controller-runtime/blob/4d770047de3a10110f5ded3bce46c178d68d0500/pkg/handler/eventhandler.go#L61-L73

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage of that? That means sources have to be able to deal with either or or there needs to be some sort of adapter to convert, both of which is very inconvenient. How does this improve the overall UX?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly for the ability to infer type, I’d say. Anytime that you specify [anything] before the method, the syntax highlighting breaks. Might be only my personal observation, env, etc. The goal is that as controller-runtime is a library, exposing a list of interfaces to extend (in case you need this), having one which unconditionally requires to use generics might make it hard for newcomers to implement custom logic. With a method one can always use just *corev1.Pod as an arg to custom EventHandler implementation and won’t have to use advanced features immediately. And wrapping EventHandle into constructors to eliminate the issue didn’t make sense to me. I can’t imagine how a structure for events may grow beyond just hosting old/new objects, which is low enough to consider a method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is that as controller-runtime is a library, exposing a list of interfaces to extend (in case you need this), having one which unconditionally requires to use generics might make it hard for newcomers to implement custom logic.

Both your and my approach leave the currently-exported interfaces in place, they have the same shape in both approaches. The difference in your approach is that because EventHandler is not an alias to TypedEventHandler, a Source can only support one of the two, because for the go type system those are different types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m trying to find a reason why I abandoned aliasing in the first place. Yes, it seems to be working, and having to specify a typedEvent[*Pod] is a minor inconvenience. My guess would be that the issue discussion lead to overcomplicating the solution. But I see a difference with using Source().Prepare() that the current builder logic was not refactored in my implementation. This means there will not be issues with existing functionality

// ObjectOld is the object from the event
ObjectOld client.Object
ObjectOld T

// ObjectNew is the object from the event
ObjectNew client.Object
ObjectNew T
}

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type DeleteEvent struct {
// TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedDeleteEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T

// DeleteStateUnknown is true if the Delete event was missed but we identified the object
// as having been deleted.
DeleteStateUnknown bool
}

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent struct {
// TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.TypedEventHandler.
type TypedGenericEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}