Skip to content

Commit

Permalink
Merge pull request #2758 from alvaroaleman/fix-race
Browse files Browse the repository at this point in the history
🐛 Prevent race when informers are started more than once
  • Loading branch information
k8s-ci-robot committed Apr 9, 2024
2 parents 01f3f84 + 6dd03ed commit 290ce58
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
2 changes: 0 additions & 2 deletions pkg/cache/cache.go
Expand Up @@ -39,11 +39,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
)

var (
log = logf.RuntimeLog.WithName("object-cache")
defaultSyncPeriod = 10 * time.Hour
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/cache/cache_test.go
Expand Up @@ -1849,6 +1849,12 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
)
})
Describe("as an Informer", func() {
It("should error when starting the cache a second time", func() {
err := informerCache.Start(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("Informer already started"))
})

Context("with structured objects", func() {
It("should be able to get informer for the object", func() {
By("getting a shared index informer for a pod")
Expand Down
13 changes: 11 additions & 2 deletions pkg/cache/internal/informers.go
Expand Up @@ -18,6 +18,7 @@ package internal

import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
Expand Down Expand Up @@ -186,10 +187,14 @@ type Informers struct {
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *Informers) Start(ctx context.Context) error {
func() {
if err := func() error {
ip.mu.Lock()
defer ip.mu.Unlock()

if ip.started {
return errors.New("Informer already started") //nolint:stylecheck
}

// Set the context so it can be passed to informers that are added later
ip.ctx = ctx

Expand All @@ -207,7 +212,11 @@ func (ip *Informers) Start(ctx context.Context) error {
// Set started to true so we immediately start any informers added later.
ip.started = true
close(ip.startWait)
}()

return nil
}(); err != nil {
return err
}
<-ctx.Done() // Block until the context is done
ip.mu.Lock()
ip.stopped = true // Set stopped to true so we don't start any new informers
Expand Down
14 changes: 9 additions & 5 deletions pkg/cache/multi_namespace_cache.go
Expand Up @@ -163,12 +163,13 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema
}

func (c *multiNamespaceCache) Start(ctx context.Context) error {
errs := make(chan error)
// start global cache
if c.clusterCache != nil {
go func() {
err := c.clusterCache.Start(ctx)
if err != nil {
log.Error(err, "cluster scoped cache failed to start")
errs <- fmt.Errorf("failed to start cluster-scoped cache: %w", err)
}
}()
}
Expand All @@ -177,13 +178,16 @@ func (c *multiNamespaceCache) Start(ctx context.Context) error {
for ns, cache := range c.namespaceToCache {
go func(ns string, cache Cache) {
if err := cache.Start(ctx); err != nil {
log.Error(err, "multi-namespace cache failed to start namespaced informer", "namespace", ns)
errs <- fmt.Errorf("failed to start cache for namespace %s: %w", ns, err)
}
}(ns, cache)
}

<-ctx.Done()
return nil
select {
case <-ctx.Done():
return nil
case err := <-errs:
return err
}
}

func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
Expand Down

0 comments on commit 290ce58

Please sign in to comment.