From 4af39e6fdee354633966d5c1c0dc9a948d1cd35f Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Mon, 4 Oct 2021 11:32:22 -0700 Subject: [PATCH] :seedling: Source should retry to get informers until timeout expires Signed-off-by: Vince Prignano --- pkg/source/source.go | 33 ++++++++++++++++++++++++++------- pkg/source/source_test.go | 6 +++++- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/pkg/source/source.go b/pkg/source/source.go index a63b37c443..708c5a5bfc 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -21,8 +21,10 @@ import ( "errors" "fmt" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -119,17 +121,34 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w ctx, ks.startCancel = context.WithCancel(ctx) ks.started = make(chan error) go func() { - // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, err := ks.cache.GetInformer(ctx, ks.Type) - if err != nil { - kindMatchErr := &meta.NoKindMatchError{} - if errors.As(err, &kindMatchErr) { - log.Error(err, "if kind is a CRD, it should be installed before calling Start", - "kind", kindMatchErr.GroupKind) + var ( + i cache.Informer + lastErr error + ) + + // Tries to get an informer until it returns true, + // an error or the specified context is cancelled or expired. + if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) { + // Lookup the Informer from the Cache and add an EventHandler which populates the Queue + i, lastErr = ks.cache.GetInformer(ctx, ks.Type) + if lastErr != nil { + kindMatchErr := &meta.NoKindMatchError{} + if errors.As(lastErr, &kindMatchErr) { + log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", + "kind", kindMatchErr.GroupKind) + } + return false, nil // Retry. + } + return true, nil + }); err != nil { + if lastErr != nil { + ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr) + return } ks.started <- err return } + i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) if !ks.cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 966e1bb95f..70c708df08 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -19,6 +19,7 @@ package source_test import ( "context" "fmt" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -218,13 +219,16 @@ var _ = Describe("Source", func() { ic.Error = fmt.Errorf("test error") q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + instance := &source.Kind{ Type: &corev1.Pod{}, } Expect(instance.InjectCache(ic)).To(Succeed()) err := instance.Start(ctx, handler.Funcs{}, q) Expect(err).NotTo(HaveOccurred()) - Expect(instance.WaitForSync(context.Background())).To(HaveOccurred()) + Eventually(instance.WaitForSync(context.Background())).Should(HaveOccurred()) }) }) })