Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable default paging in list watches #51876

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/client/tests/listwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestListWatchesCanList(t *testing.T) {
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
lw := NewListWatchFromClient(client.Core().RESTClient(), item.resource, item.namespace, item.fieldSelector)
lw.DisablePaging = true
lw.DisableChunking = true
// This test merely tests that the correct request is made.
lw.List(metav1.ListOptions{})
handler.ValidateRequest(t, item.location, "GET", nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func TestGCListWatcher(t *testing.T) {
t.Fatal(err)
}
lw := listWatcher(client, podResource)
lw.DisablePaging = true
lw.DisableChunking = true
if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 8 additions & 4 deletions staging/src/k8s.io/client-go/tools/cache/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
DisablePaging bool
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in
// 1.9 will allow a controller to opt out of chunking.
DisableChunking bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be named in the positive, especially since that would get the default behavior you want?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan was in 1.9 to remove the false && below. All of the test suites are already configured to bypass chunking by setting this flag, whereas all clients would still be opt out for beta. I wanted to avoid pointer insanity in internal code as well as bad globals (since this is core library code).

}

// Getter interface knows how to access Get method from RESTClient.
Expand Down Expand Up @@ -91,7 +93,9 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration {

// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
if !lw.DisablePaging {
// chunking will become the default for list watchers starting in Kubernetes 1.9, unless
// otherwise disabled.
if false && !lw.DisableChunking {
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
}
return lw.ListFunc(options)
Expand Down
15 changes: 15 additions & 0 deletions staging/src/k8s.io/client-go/tools/pager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)

go_library(
Expand Down Expand Up @@ -34,3 +35,17 @@ filegroup(
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

go_test(
name = "go_default_test",
srcs = ["pager_test.go"],
library = ":go_default_library",
deps = [
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1alpha1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
8 changes: 6 additions & 2 deletions staging/src/k8s.io/client-go/tools/pager/pager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type ListPager struct {
}

// New creates a new pager from the provided pager function using the default
// options.
// options. It will fall back to a full list if an expiration error is encountered
// as a last resort.
func New(fn ListPageFunc) *ListPager {
return &ListPager{
PageSize: defaultPageSize,
Expand All @@ -61,9 +62,12 @@ func New(fn ListPageFunc) *ListPager {
}
}

// TODO: introduce other types of paging functions - such as those that retrieve from a list
// of namespaces.

// List returns a single list object, but attempts to retrieve smaller chunks from the
// server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead.
// the full list instead. The Limit field on options, if unset, will default to the page size.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if options.Limit == 0 {
options.Limit = p.PageSize
Expand Down
206 changes: 206 additions & 0 deletions staging/src/k8s.io/client-go/tools/pager/pager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pager

import (
"fmt"
"reflect"
"testing"

"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1alpha1 "k8s.io/apimachinery/pkg/apis/meta/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
)

func list(count int, rv string) *metainternalversion.List {
var list metainternalversion.List
for i := 0; i < count; i++ {
list.Items = append(list.Items, &metav1alpha1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", i),
},
})
}
list.ResourceVersion = rv
return &list
}

type testPager struct {
t *testing.T
rv string
index int
remaining int
last int
continuing bool
done bool
expectPage int64
}

func (p *testPager) reset() {
p.continuing = false
p.remaining += p.index
p.index = 0
p.last = 0
p.done = false
}

func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if p.done {
p.t.Errorf("did not expect additional call to paged list")
return nil, fmt.Errorf("unexpected list call")
}
expectedContinue := fmt.Sprintf("%s:%d", p.rv, p.last)
if options.Limit != p.expectPage || (p.continuing && options.Continue != expectedContinue) {
p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options)
return nil, fmt.Errorf("invariant violated")
}
var list metainternalversion.List
total := options.Limit
if total == 0 {
total = int64(p.remaining)
}
for i := int64(0); i < total; i++ {
if p.remaining <= 0 {
break
}
list.Items = append(list.Items, &metav1alpha1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", p.index),
},
})
p.remaining--
p.index++
}
p.last = p.index
if p.remaining > 0 {
list.Continue = fmt.Sprintf("%s:%d", p.rv, p.last)
p.continuing = true
} else {
p.done = true
}
list.ResourceVersion = p.rv
return &list, nil
}

func (p *testPager) ExpiresOnSecondPage(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if p.continuing {
p.done = true
return nil, errors.NewResourceExpired("this list has expired")
}
return p.PagedList(ctx, options)
}

func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if p.continuing {
p.reset()
p.expectPage = 0
return nil, errors.NewResourceExpired("this list has expired")
}
return p.PagedList(ctx, options)
}

func TestListPager_List(t *testing.T) {
type fields struct {
PageSize int64
PageFn ListPageFunc
FullListIfExpired bool
}
type args struct {
ctx context.Context
options metav1.ListOptions
}
tests := []struct {
name string
fields fields
args args
want runtime.Object
wantErr bool
isExpired bool
}{
{
name: "empty page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
args: args{},
want: list(0, "rv:20"),
},
{
name: "one page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
args: args{},
want: list(9, "rv:20"),
},
{
name: "one full page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
args: args{},
want: list(10, "rv:20"),
},
{
name: "two pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{},
want: list(11, "rv:20"),
},
{
name: "three pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
args: args{},
want: list(21, "rv:20"),
},
{
name: "expires on second page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
args: args{},
wantErr: true,
isExpired: true,
},
{
name: "expires on second page and then lists",
fields: fields{
FullListIfExpired: true,
PageSize: 10,
PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList,
},
args: args{},
want: list(21, "rv:20"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &ListPager{
PageSize: tt.fields.PageSize,
PageFn: tt.fields.PageFn,
FullListIfExpired: tt.fields.FullListIfExpired,
}
got, err := p.List(tt.args.ctx, tt.args.options)
if (err != nil) != tt.wantErr {
t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.isExpired != errors.IsResourceExpired(err) {
t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
}
})
}
}