Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Allow non-blocking retrieval of informers #2371

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,15 @@ type nonTypedOnlyCache struct {
cache.Cache
}

func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
switch obj.(type) {
case (*metav1.PartialObjectMetadata):
return c.Cache.GetInformer(ctx, obj)
return c.Cache.GetInformer(ctx, obj, opts...)
default:
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
}
}
func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) {
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
}

Expand Down
22 changes: 20 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ var (
defaultSyncPeriod = 10 * time.Hour
)

// InformerGetOptions defines the behavior of how informers are retrieved.
type InformerGetOptions internal.GetOptions

// InformerGetOption defines an option that alters the behavior of how informers are retrieved.
type InformerGetOption func(*InformerGetOptions)

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
func BlockUntilSynced(shouldBlock bool) InformerGetOption {
return func(opts *InformerGetOptions) {
opts.BlockUntilSynced = &shouldBlock
}
}

// Cache knows how to load Kubernetes objects, fetch informers to request
// to receive events for Kubernetes objects (at a low-level),
// and add indices to fields on the objects stored in the cache.
Expand All @@ -61,11 +75,11 @@ type Cache interface {
type Informers interface {
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformer(ctx context.Context, obj client.Object) (Informer, error)
GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)

// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Expand Down Expand Up @@ -187,6 +201,9 @@ type Options struct {
// ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object.
// object, this will fall through to Default* settings.
ByObject map[client.Object]ByObject

// newInformer allows overriding of NewSharedIndexInformer for testing.
newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer
}

// ByObject offers more fine-grained control over the cache's ListWatch by object.
Expand Down Expand Up @@ -337,6 +354,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
},
Transform: config.Transform,
UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),
NewInformer: opts.newInformer,
}),
readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
}
Expand Down
91 changes: 90 additions & 1 deletion pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"
"strconv"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -43,6 +44,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
)

const testNodeOne = "test-node-1"
Expand Down Expand Up @@ -117,6 +119,7 @@ func deletePod(pod client.Object) {

var _ = Describe("Informer Cache", func() {
CacheTest(cache.New, cache.Options{})
NonBlockingGetTest(cache.New, cache.Options{})
})

var _ = Describe("Informer Cache with ReaderFailOnMissingInformer", func() {
Expand All @@ -131,12 +134,22 @@ var _ = Describe("Multi-Namespace Informer Cache", func() {
"default": {},
},
})
NonBlockingGetTest(cache.New, cache.Options{
DefaultNamespaces: map[string]cache.Config{
testNamespaceOne: {},
testNamespaceTwo: {},
"default": {},
},
})
})

var _ = Describe("Informer Cache without global DeepCopy", func() {
CacheTest(cache.New, cache.Options{
DefaultUnsafeDisableDeepCopy: pointer.Bool(true),
})
NonBlockingGetTest(cache.New, cache.Options{
DefaultUnsafeDisableDeepCopy: pointer.Bool(true),
})
})

var _ = Describe("Cache with transformers", func() {
Expand Down Expand Up @@ -440,7 +453,6 @@ func CacheTestReaderFailOnMissingInformer(createCacheFunc func(config *rest.Conf
BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())

By("creating the informer cache")
var err error
informerCache, err = createCacheFunc(cfg, opts)
Expand Down Expand Up @@ -507,6 +519,83 @@ func CacheTestReaderFailOnMissingInformer(createCacheFunc func(config *rest.Conf
})
}

func NonBlockingGetTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) {
Describe("non-blocking get test", func() {
var (
informerCache cache.Cache
informerCacheCtx context.Context
informerCacheCancel context.CancelFunc
)
BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())

By("creating expected namespaces")
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
err = ensureNode(testNodeOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceTwo, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceThree, cl)
Expect(err).NotTo(HaveOccurred())

By("creating the informer cache")
v := reflect.ValueOf(&opts).Elem()
newInformerField := v.FieldByName("newInformer")
newFakeInformer := func(_ kcache.ListerWatcher, _ runtime.Object, _ time.Duration, _ kcache.Indexers) kcache.SharedIndexInformer {
return &controllertest.FakeInformer{Synced: false}
}
reflect.NewAt(newInformerField.Type(), newInformerField.Addr().UnsafePointer()).
Elem().
Set(reflect.ValueOf(&newFakeInformer))
informerCache, err = createCacheFunc(cfg, opts)
Expect(err).NotTo(HaveOccurred())
By("running the cache and waiting for it to sync")
// pass as an arg so that we don't race between close and re-assign
go func(ctx context.Context) {
defer GinkgoRecover()
Expect(informerCache.Start(ctx)).To(Succeed())
}(informerCacheCtx)
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
})

AfterEach(func() {
By("cleaning up created pods")
informerCacheCancel()
})

Describe("as an Informer", func() {
It("should be able to get informer for the object without blocking", func() {
By("getting a shared index informer for a pod")
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "informer-obj",
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
sii, err := informerCache.GetInformer(ctx, pod, cache.BlockUntilSynced(false))
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeFalse())
})
})
})
}

func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) {
Describe("Cache test", func() {
var (
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/delegating_by_gvk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis
return cache.List(ctx, list, opts...)
}

func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
cache, err := dbt.cacheForObject(obj)
if err != nil {
return nil, err
}
return cache.GetInformer(ctx, obj)
return cache.GetInformer(ctx, obj, opts...)
}

func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk)
func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk, opts...)
}

func (dbt *delegatingByGVKCache) Start(ctx context.Context) error {
Expand Down
18 changes: 13 additions & 5 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,29 +141,37 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
return &gvk, cacheTypeObj, nil
}

func applyGetOptions(opts ...InformerGetOption) *internal.GetOptions {
cfg := &InformerGetOptions{}
for _, opt := range opts {
opt(cfg)
}
return (*internal.GetOptions)(cfg)
}

// GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started.
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
// Map the gvk to an object
obj, err := ic.scheme.New(gvk)
if err != nil {
return nil, err
}

_, i, err := ic.Informers.Get(ctx, gvk, obj)
_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
if err != nil {
return nil, err
}
return i.Informer, nil
}

// GetInformer returns the informer for the obj. If no informer exists, one will be started.
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return nil, err
}

_, i, err := ic.Informers.Get(ctx, gvk, obj)
_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
if err != nil {
return nil, err
}
Expand All @@ -179,7 +187,7 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
return started, cache, nil
}

return ic.Informers.Get(ctx, gvk, obj)
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type FakeInformers struct {
}

// GetInformerForKind implements Informers.
func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand All @@ -61,7 +61,7 @@ func (c *FakeInformers) FakeInformerForKind(ctx context.Context, gvk schema.Grou
}

// GetInformer implements Informers.
func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand Down
27 changes: 24 additions & 3 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ type InformersOpts struct {
Mapper meta.RESTMapper
ResyncPeriod time.Duration
Namespace string
NewInformer *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
Selector Selector
Transform cache.TransformFunc
UnsafeDisableDeepCopy bool
}

// NewInformers creates a new InformersMap that can create informers under the hood.
func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
newInformer := cache.NewSharedIndexInformer
if options.NewInformer != nil {
newInformer = *options.NewInformer
}
return &Informers{
config: config,
httpClient: options.HTTPClient,
Expand All @@ -70,6 +75,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
selector: options.Selector,
transform: options.Transform,
unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy,
newInformer: newInformer,
}
}

Expand All @@ -88,6 +94,13 @@ type tracker struct {
Metadata map[schema.GroupVersionKind]*Cache
}

// GetOptions provides configuration to customize the behavior when
// getting an informer.
type GetOptions struct {
sbueringer marked this conversation as resolved.
Show resolved Hide resolved
// BlockUntilSynced controls if the informer retrieval will block until the informer is synced. Defaults to `true`.
BlockUntilSynced *bool
}

// Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type Informers struct {
Expand Down Expand Up @@ -143,6 +156,9 @@ type Informers struct {
selector Selector
transform cache.TransformFunc
unsafeDisableDeepCopy bool

// NewInformer allows overriding of the shared index informer constructor for testing.
newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -240,7 +256,7 @@ func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) {
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) {
// Return the informer if it is found
i, started, ok := ip.Peek(gvk, obj)
if !ok {
Expand All @@ -250,7 +266,12 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
}
}

if started && !i.Informer.HasSynced() {
shouldBlock := true
if opts.BlockUntilSynced != nil {
shouldBlock = *opts.BlockUntilSynced
}

if shouldBlock && started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
Expand Down Expand Up @@ -288,7 +309,7 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
if err != nil {
return nil, false, err
}
sharedIndexInformer := cache.NewSharedIndexInformer(&cache.ListWatch{
sharedIndexInformer := ip.newInformer(&cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selector.ApplyToList(&opts)
return listWatcher.ListFunc(opts)
Expand Down