Skip to content

Commit

Permalink
🐛 Fix cache sync timeout functionality
Browse files Browse the repository at this point in the history
So far, timing out the cache sync in most realistic scenarios didn't work,
bceause source.Kind gets an already started cache from the Manager. A
cache that is already started will block forever on GetInformer which we
called in source.Kinds start and not its WaitForSync. The context passed
to Start however defines the entire lifecycle of the Source, not the
Start timeout.

This change makes source.Kind call GetInformer in Start but in a new
go routine and WaitForSync just wait for that to finish or for its
context to be cancelled. This preserves the existing semantic of
starting in Start but waint for ready in WaitForSync.
  • Loading branch information
alvaroaleman committed Mar 16, 2021
1 parent df2c43d commit 4d059e8
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 41 deletions.
44 changes: 22 additions & 22 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -104,40 +106,21 @@ var _ = Describe("controller", func() {
close(done)
})

It("should wait for each informer to sync", func(done Done) {
// TODO(directxman12): this test doesn't do what it says it does

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
Expect(err).NotTo(HaveOccurred())
ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
}}

// Use a cancelled context so Start doesn't block
ctx, cancel := context.WithCancel(context.Background())
cancel()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())

close(done)
})

It("should error when cache sync timeout occurs", func(done Done) {
ctrl.CacheSyncTimeout = 10 * time.Nanosecond

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
c = &cacheWithIndefinitelyBlockingGetInformer{c}

ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
}}
ctrl.Name = "testcontroller"

err = ctrl.Start(context.TODO())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("cache did not sync"))
Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))

close(done)
})
Expand Down Expand Up @@ -944,3 +927,20 @@ func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
}()
return s.SyncingSource.WaitForSync(ctx)
}

var _ cache.Cache = &cacheWithIndefinitelyBlockingGetInformer{}

// cacheWithIndefinitelyBlockingGetInformer has a GetInformer implementation that blocks indefinitely or until its
// context is cancelled.
// We need it as a workaround for testenvs lack of support for a secure apiserver, because the insecure port always
// implies the allow all authorizer, so we can not simulate rbac issues with it. They are the usual cause of the real
// caches GetInformer blocking showing this behavior.
// TODO: Remove this once envtest supports a secure apiserver.
type cacheWithIndefinitelyBlockingGetInformer struct {
cache.Cache
}

func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
<-ctx.Done()
return nil, errors.New("GetInformer timed out")
}
47 changes: 34 additions & 13 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type Kind struct {

// cache used to watch APIs
cache cache.Cache

// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
started chan error
startCancel func()
}

var _ SyncingSource = &Kind{}
Expand All @@ -110,16 +115,30 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}

// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(err, &kindMatchErr) {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
}
ks.started <- err
return
}
return err
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()

return nil
}

Expand All @@ -133,11 +152,13 @@ func (ks *Kind) String() string {
// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(ctx context.Context) error {
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
return errors.New("cache did not sync")
select {
case err := <-ks.started:
return err
case <-ctx.Done():
ks.startCancel()
return errors.New("timed out waiting for cache to be synced")
}
return nil
}

var _ inject.Cache = &Kind{}
Expand Down
18 changes: 12 additions & 6 deletions pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())

i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -133,6 +134,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())

i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -178,6 +180,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())

i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -208,10 +211,11 @@ var _ = Describe("Source", func() {
})

It("should return an error if syncing fails", func(done Done) {
instance := source.Kind{}
instance := source.Kind{Type: &corev1.Pod{}}
f := false
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
err := instance.WaitForSync(nil)
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
err := instance.WaitForSync(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

Expand All @@ -220,7 +224,7 @@ var _ = Describe("Source", func() {
})

Context("for a Kind not in the cache", func() {
It("should return an error when Start is called", func(done Done) {
It("should return an error when WaitForSync is called", func(done Done) {
ic.Error = fmt.Errorf("test error")
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")

Expand All @@ -229,7 +233,8 @@ var _ = Describe("Source", func() {
}
Expect(instance.InjectCache(ic)).To(Succeed())
err := instance.Start(ctx, handler.Funcs{}, q)
Expect(err).To(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).To(HaveOccurred())

close(done)
})
Expand All @@ -246,8 +251,9 @@ var _ = Describe("Source", func() {

It("should return an error if syncing fails", func(done Done) {
f := false
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
err := instance.WaitForSync(nil)
instance := source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f})
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
err := instance.WaitForSync(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

Expand Down

0 comments on commit 4d059e8

Please sign in to comment.