Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Prevent race when informers are started more than once #2758

Merged
merged 1 commit into from Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/cache/cache.go
Expand Up @@ -39,11 +39,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
)

var (
log = logf.RuntimeLog.WithName("object-cache")
defaultSyncPeriod = 10 * time.Hour
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/cache/cache_test.go
Expand Up @@ -1849,6 +1849,12 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
)
})
Describe("as an Informer", func() {
It("should error when starting the cache a second time", func() {
err := informerCache.Start(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("Informer already started"))
})

Context("with structured objects", func() {
It("should be able to get informer for the object", func() {
By("getting a shared index informer for a pod")
Expand Down
13 changes: 11 additions & 2 deletions pkg/cache/internal/informers.go
Expand Up @@ -18,6 +18,7 @@ package internal

import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
Expand Down Expand Up @@ -186,10 +187,14 @@ type Informers struct {
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *Informers) Start(ctx context.Context) error {
func() {
if err := func() error {
ip.mu.Lock()
defer ip.mu.Unlock()

if ip.started {
return errors.New("Informer already started") //nolint:stylecheck
}

// Set the context so it can be passed to informers that are added later
ip.ctx = ctx

Expand All @@ -207,7 +212,11 @@ func (ip *Informers) Start(ctx context.Context) error {
// Set started to true so we immediately start any informers added later.
ip.started = true
close(ip.startWait)
}()

return nil
}(); err != nil {
return err
}
<-ctx.Done() // Block until the context is done
ip.mu.Lock()
ip.stopped = true // Set stopped to true so we don't start any new informers
Expand Down
14 changes: 9 additions & 5 deletions pkg/cache/multi_namespace_cache.go
Expand Up @@ -163,12 +163,13 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema
}

func (c *multiNamespaceCache) Start(ctx context.Context) error {
errs := make(chan error)
// start global cache
if c.clusterCache != nil {
go func() {
err := c.clusterCache.Start(ctx)
if err != nil {
log.Error(err, "cluster scoped cache failed to start")
errs <- fmt.Errorf("failed to start cluster-scoped cache: %w", err)
}
}()
}
Expand All @@ -177,13 +178,16 @@ func (c *multiNamespaceCache) Start(ctx context.Context) error {
for ns, cache := range c.namespaceToCache {
go func(ns string, cache Cache) {
if err := cache.Start(ctx); err != nil {
log.Error(err, "multi-namespace cache failed to start namespaced informer", "namespace", ns)
errs <- fmt.Errorf("failed to start cache for namespace %s: %w", ns, err)
}
}(ns, cache)
}

<-ctx.Done()
return nil
select {
case <-ctx.Done():
return nil
case err := <-errs:
return err
}
}

func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
Expand Down