Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix cache sync timeout functionality #1428

Merged
merged 1 commit into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 22 additions & 22 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -104,40 +106,21 @@ var _ = Describe("controller", func() {
close(done)
})

It("should wait for each informer to sync", func(done Done) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing this test ever tested was that controller.Start with a cancelled context doesn't error. With the changes in this PR however it will always error because the context passed from the Controller to WaitForSync is cancelled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case do you think it's worth having a test case that checks that an error is always returned if the context is cancelled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO the only thing someone who starts a Controller with a cancelled context can realistically expect is that it ends quickly, if it errors or not along the way ends up being an implementation detail. It is not going to result in anything useful.

// TODO(directxman12): this test doesn't do what it says it does

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
Expect(err).NotTo(HaveOccurred())
ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
}}

// Use a cancelled context so Start doesn't block
ctx, cancel := context.WithCancel(context.Background())
cancel()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())

close(done)
})

It("should error when cache sync timeout occurs", func(done Done) {
ctrl.CacheSyncTimeout = 10 * time.Nanosecond

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
c = &cacheWithIndefinitelyBlockingGetInformer{c}

ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
}}
ctrl.Name = "testcontroller"

err = ctrl.Start(context.TODO())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("cache did not sync"))
Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))

close(done)
})
Expand Down Expand Up @@ -944,3 +927,20 @@ func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
}()
return s.SyncingSource.WaitForSync(ctx)
}

var _ cache.Cache = &cacheWithIndefinitelyBlockingGetInformer{}

// cacheWithIndefinitelyBlockingGetInformer has a GetInformer implementation that blocks indefinitely or until its
// context is cancelled.
// We need it as a workaround for testenvs lack of support for a secure apiserver, because the insecure port always
// implies the allow all authorizer, so we can not simulate rbac issues with it. They are the usual cause of the real
// caches GetInformer blocking showing this behavior.
// TODO: Remove this once envtest supports a secure apiserver.
type cacheWithIndefinitelyBlockingGetInformer struct {
cache.Cache
}

func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
<-ctx.Done()
return nil, errors.New("GetInformer timed out")
}
47 changes: 34 additions & 13 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type Kind struct {

// cache used to watch APIs
cache cache.Cache

// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
started chan error
startCancel func()
}

var _ SyncingSource = &Kind{}
Expand All @@ -110,16 +115,30 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}

// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(err, &kindMatchErr) {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
}
ks.started <- err
return
}
return err
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()

return nil
}

Expand All @@ -133,11 +152,13 @@ func (ks *Kind) String() string {
// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(ctx context.Context) error {
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
return errors.New("cache did not sync")
select {
case err := <-ks.started:
return err
case <-ctx.Done():
ks.startCancel()
return errors.New("timed out waiting for cache to be synced")
}
return nil
}

var _ inject.Cache = &Kind{}
Expand Down
18 changes: 12 additions & 6 deletions pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to call WaitForSync here because before that finished, the Informer won't have the EventHandler attached. Not waiting makes the test racy.


i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -133,6 +134,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())

i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -178,6 +180,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())

i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -208,10 +211,11 @@ var _ = Describe("Source", func() {
})

It("should return an error if syncing fails", func(done Done) {
instance := source.Kind{}
instance := source.Kind{Type: &corev1.Pod{}}
f := false
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
err := instance.WaitForSync(nil)
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
err := instance.WaitForSync(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

Expand All @@ -220,7 +224,7 @@ var _ = Describe("Source", func() {
})

Context("for a Kind not in the cache", func() {
It("should return an error when Start is called", func(done Done) {
It("should return an error when WaitForSync is called", func(done Done) {
ic.Error = fmt.Errorf("test error")
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")

Expand All @@ -229,7 +233,8 @@ var _ = Describe("Source", func() {
}
Expect(instance.InjectCache(ic)).To(Succeed())
err := instance.Start(ctx, handler.Funcs{}, q)
Expect(err).To(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).To(HaveOccurred())

close(done)
})
Expand All @@ -246,8 +251,9 @@ var _ = Describe("Source", func() {

It("should return an error if syncing fails", func(done Done) {
f := false
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
err := instance.WaitForSync(nil)
instance := source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f})
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
err := instance.WaitForSync(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

Expand Down