Skip to content

Commit

Permalink
Re-implement gvkFixupWatcher as a watch.FilterFunc
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Wall <richard.wall@jetstack.io>
  • Loading branch information
wallrj committed Feb 4, 2022
1 parent 0fbdc4b commit 5f4904e
Showing 1 changed file with 26 additions and 34 deletions.
60 changes: 26 additions & 34 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,41 +409,33 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
}, nil
}

type gvkFixupWatcher struct {
watcher watch.Interface
ch chan watch.Event
gvk schema.GroupVersionKind
wg sync.WaitGroup
}

// newGVKFixupWatcher adds a wrapper that preserves the GVK information when
// events come in.
//
// This works around a bug where GVK information is not passed into mapping
// functions when using the OnlyMetadata option in the builder.
// This issue is most likely caused by kubernetes/kubernetes#80609.
// See kubernetes-sigs/controller-runtime#1484.
//
// This was originally implemented as a cache.ResourceEventHandler wrapper but
// that contained a data race which was resolved by setting the GVK in a watch
// wrapper, before the objects are written to the cache.
// See kubernetes-sigs/controller-runtime#1650.
//
// The original watch wrapper was found to be incompatible with
// k8s.io/client-go/tools/cache.Reflector so it has been re-implemented as a
// watch.Filter which is compatible.
// See kubernetes-sigs/controller-runtime#1789.
func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
ch := make(chan watch.Event)
w := &gvkFixupWatcher{
gvk: gvk,
watcher: watcher,
ch: ch,
}
w.wg.Add(1)
go w.run()
return w
}

func (w *gvkFixupWatcher) run() {
for e := range w.watcher.ResultChan() {
e.Object.GetObjectKind().SetGroupVersionKind(w.gvk)
w.ch <- e
}
w.wg.Done()
}

func (w *gvkFixupWatcher) Stop() {
w.watcher.Stop()
w.wg.Wait()
close(w.ch)
}

func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event {
return w.ch
return watch.Filter(
watcher,
func(in watch.Event) (out watch.Event, keep bool) {
keep = true
in.DeepCopyInto(&out)
out.Object.GetObjectKind().SetGroupVersionKind(gvk)
return out, keep
},
)
}

// resyncPeriod returns a function which generates a duration each time it is
Expand Down

0 comments on commit 5f4904e

Please sign in to comment.