Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃尡 Source should retry to get informers until timeout expires #1678

Merged
merged 1 commit into from Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 26 additions & 7 deletions pkg/source/source.go
Expand Up @@ -21,8 +21,10 @@ import (
"errors"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -119,17 +121,34 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(err, &kindMatchErr) {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
var (
i cache.Informer
lastErr error
)

// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
if lastErr != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(lastErr, &kindMatchErr) {
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
}
return false, nil // Retry.
}
return true, nil
}); err != nil {
if lastErr != nil {
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
return
}
ks.started <- err
return
}

i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
Expand Down
6 changes: 5 additions & 1 deletion pkg/source/source_test.go
Expand Up @@ -19,6 +19,7 @@ package source_test
import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -218,13 +219,16 @@ var _ = Describe("Source", func() {
ic.Error = fmt.Errorf("test error")
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")

ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

instance := &source.Kind{
Type: &corev1.Pod{},
}
Expect(instance.InjectCache(ic)).To(Succeed())
err := instance.Start(ctx, handler.Funcs{}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).To(HaveOccurred())
Eventually(instance.WaitForSync(context.Background())).Should(HaveOccurred())
})
})
})
Expand Down