Skip to content

Commit

Permalink
Support api chunking in kubectl get
Browse files Browse the repository at this point in the history
This enables chunking in the resource builder to make it easy to
retrieve resources in pages and visit partial result sets. This adds
`--chunk-size` to `kubectl get` only so that users can get comfortable
with the use of chunking in beta. Future changes will enable chunking
for all CLI commands so that bulk actions can be performed more
efficiently.
  • Loading branch information
smarterclayton committed Oct 12, 2017
1 parent 0b1da1f commit da1065f
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 38 deletions.
13 changes: 12 additions & 1 deletion hack/make-rules/test-cmd-util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ run_kubectl_get_tests() {
fi

### Test kubectl get all
output_message=$(kubectl --v=6 --namespace default get all 2>&1 "${kube_flags[@]}")
output_message=$(kubectl --v=6 --namespace default get all --experimental-chunk-size=0 2>&1 "${kube_flags[@]}")
# Post-condition: Check if we get 200 OK from all the url(s)
kube::test::if_has_string "${output_message}" "/api/v1/namespaces/default/pods 200 OK"
kube::test::if_has_string "${output_message}" "/api/v1/namespaces/default/replicationcontrollers 200 OK"
Expand All @@ -1356,6 +1356,17 @@ run_kubectl_get_tests() {
kube::test::if_has_string "${output_message}" "/apis/extensions/v1beta1/namespaces/default/deployments 200 OK"
kube::test::if_has_string "${output_message}" "/apis/extensions/v1beta1/namespaces/default/replicasets 200 OK"

### Test kubectl get chunk size
output_message=$(kubectl --v=6 get clusterrole --experimental-chunk-size=10 2>&1 "${kube_flags[@]}")
# Post-condition: Check if we get a limit and continue
kube::test::if_has_string "${output_message}" "/clusterroles?limit=10 200 OK"
kube::test::if_has_string "${output_message}" "/v1/clusterroles?continue="

### Test kubectl get chunk size defaults to 500
output_message=$(kubectl --v=6 get clusterrole 2>&1 "${kube_flags[@]}")
# Post-condition: Check if we get a limit and continue
kube::test::if_has_string "${output_message}" "/clusterroles?limit=500 200 OK"

### Test --allow-missing-template-keys
# Pre-condition: no POD exists
create_and_use_new_namespace
Expand Down
10 changes: 9 additions & 1 deletion pkg/kubectl/cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,15 @@ func (p *pruner) prune(namespace string, mapping *meta.RESTMapping, shortOutput,
return err
}

objList, err := resource.NewHelper(c, mapping).List(namespace, mapping.GroupVersionKind.Version, p.selector, false, includeUninitialized)
objList, err := resource.NewHelper(c, mapping).List(
namespace,
mapping.GroupVersionKind.Version,
false,
&metav1.ListOptions{
LabelSelector: p.selector,
IncludeUninitialized: includeUninitialized,
},
)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubectl/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type GetOptions struct {

IgnoreNotFound bool
Raw string
ChunkSize int64
}

var (
Expand Down Expand Up @@ -137,6 +138,7 @@ func NewCmdGet(f cmdutil.Factory, out io.Writer, errOut io.Writer) *cobra.Comman
cmd.Flags().Bool("show-kind", false, "If present, list the resource type for the requested object(s).")
cmd.Flags().Bool("all-namespaces", false, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
cmd.Flags().BoolVar(&options.IgnoreNotFound, "ignore-not-found", false, "Treat \"resource not found\" as a successful retrieval.")
cmd.Flags().Int64Var(&options.ChunkSize, "experimental-chunk-size", 500, "Return large lists in chunks rather than all at once. Pass 0 to disable.")
cmd.Flags().StringSliceP("label-columns", "L", []string{}, "Accepts a comma separated list of labels that are going to be presented as columns. Names are case-sensitive. You can also use multiple flag options like -L label1 -L label2...")
cmd.Flags().Bool("export", false, "If true, use 'export' for the resources. Exported resources are stripped of cluster-specific information.")
addOpenAPIPrintColumnFlags(cmd)
Expand Down Expand Up @@ -223,6 +225,7 @@ func RunGet(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args [
FilenameParam(enforceNamespace, &options.FilenameOptions).
SelectorParam(selector).
ExportParam(export).
RequestChunksOf(options.ChunkSize).
IncludeUninitialized(includeUninitialized).
ResourceTypeOrNameArgs(true, args...).
SingleResourceType().
Expand Down Expand Up @@ -325,6 +328,7 @@ func RunGet(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args [
FilenameParam(enforceNamespace, &options.FilenameOptions).
SelectorParam(selector).
ExportParam(export).
RequestChunksOf(options.ChunkSize).
IncludeUninitialized(includeUninitialized).
ResourceTypeOrNameArgs(true, args...).
ContinueOnError().
Expand Down
11 changes: 10 additions & 1 deletion pkg/kubectl/resource/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Builder struct {
selector *string
selectAll bool
includeUninitialized bool
limitChunks int64

resources []string

Expand Down Expand Up @@ -355,6 +356,14 @@ func (b *Builder) RequireNamespace() *Builder {
return b
}

// RequestChunksOf attempts to load responses from the server in batches of size limit
// to avoid long delays loading and transferring very large lists. If unset defaults to
// no chunking.
func (b *Builder) RequestChunksOf(chunkSize int64) *Builder {
b.limitChunks = chunkSize
return b
}

// SelectEverythingParam
func (b *Builder) SelectAllParam(selectAll bool) *Builder {
if selectAll && b.selector != nil {
Expand Down Expand Up @@ -653,7 +662,7 @@ func (b *Builder) visitBySelector() *Result {
if mapping.Scope.Name() != meta.RESTScopeNameNamespace {
selectorNamespace = ""
}
visitors = append(visitors, NewSelector(client, mapping, selectorNamespace, *b.selector, b.export, b.includeUninitialized))
visitors = append(visitors, NewSelector(client, mapping, selectorNamespace, *b.selector, b.export, b.includeUninitialized, b.limitChunks))
}
if b.continueOnError {
result.visitor = EagerVisitorList(visitors)
Expand Down
10 changes: 2 additions & 8 deletions pkg/kubectl/resource/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,15 @@ func (m *Helper) Get(namespace, name string, export bool) (runtime.Object, error
return req.Do().Get()
}

// TODO: add field selector
func (m *Helper) List(namespace, apiVersion string, selector string, export, includeUninitialized bool) (runtime.Object, error) {
func (m *Helper) List(namespace, apiVersion string, export bool, options *metav1.ListOptions) (runtime.Object, error) {
req := m.RESTClient.Get().
NamespaceIfScoped(namespace, m.NamespaceScoped).
Resource(m.Resource).
VersionedParams(&metav1.ListOptions{
LabelSelector: selector,
}, metav1.ParameterCodec)
VersionedParams(options, metav1.ParameterCodec)
if export {
// TODO: I should be part of ListOptions
req.Param("export", strconv.FormatBool(export))
}
if includeUninitialized {
req.Param("includeUninitialized", strconv.FormatBool(includeUninitialized))
}
return req.Do().Get()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubectl/resource/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestHelperList(t *testing.T) {
RESTClient: client,
NamespaceScoped: true,
}
obj, err := modifier.List("bar", api.Registry.GroupOrDie(api.GroupName).GroupVersion.String(), "foo=baz", false, false)
obj, err := modifier.List("bar", api.Registry.GroupOrDie(api.GroupName).GroupVersion.String(), false, &metav1.ListOptions{LabelSelector: "foo=baz"})
if (err != nil) != test.Err {
t.Errorf("unexpected error: %t %v", test.Err, err)
}
Expand Down
80 changes: 54 additions & 26 deletions pkg/kubectl/resource/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

Expand All @@ -32,53 +33,80 @@ type Selector struct {
Selector string
Export bool
IncludeUninitialized bool
LimitChunks int64
}

// NewSelector creates a resource selector which hides details of getting items by their label selector.
func NewSelector(client RESTClient, mapping *meta.RESTMapping, namespace string, selector string, export, includeUninitialized bool) *Selector {
func NewSelector(client RESTClient, mapping *meta.RESTMapping, namespace string, selector string, export, includeUninitialized bool, limitChunks int64) *Selector {
return &Selector{
Client: client,
Mapping: mapping,
Namespace: namespace,
Selector: selector,
Export: export,
IncludeUninitialized: includeUninitialized,
LimitChunks: limitChunks,
}
}

// Visit implements Visitor
// Visit implements Visitor and uses request chunking by default.
func (r *Selector) Visit(fn VisitorFunc) error {
list, err := NewHelper(r.Client, r.Mapping).List(r.Namespace, r.ResourceMapping().GroupVersionKind.GroupVersion().String(), r.Selector, r.Export, r.IncludeUninitialized)
if err != nil {
if errors.IsBadRequest(err) || errors.IsNotFound(err) {
if se, ok := err.(*errors.StatusError); ok {
// modify the message without hiding this is an API error
var continueToken string
for {
list, err := NewHelper(r.Client, r.Mapping).List(
r.Namespace,
r.ResourceMapping().GroupVersionKind.GroupVersion().String(),
r.Export,
&metav1.ListOptions{
LabelSelector: r.Selector,
IncludeUninitialized: r.IncludeUninitialized,
Limit: r.LimitChunks,
Continue: continueToken,
},
)
if err != nil {
if errors.IsResourceExpired(err) {
return err
}
if errors.IsBadRequest(err) || errors.IsNotFound(err) {
if se, ok := err.(*errors.StatusError); ok {
// modify the message without hiding this is an API error
if len(r.Selector) == 0 {
se.ErrStatus.Message = fmt.Sprintf("Unable to list %q: %v", r.Mapping.Resource, se.ErrStatus.Message)
} else {
se.ErrStatus.Message = fmt.Sprintf("Unable to find %q that match the selector %q: %v", r.Mapping.Resource, r.Selector, se.ErrStatus.Message)
}
return se
}
if len(r.Selector) == 0 {
se.ErrStatus.Message = fmt.Sprintf("Unable to list %q: %v", r.Mapping.Resource, se.ErrStatus.Message)
} else {
se.ErrStatus.Message = fmt.Sprintf("Unable to find %q that match the selector %q: %v", r.Mapping.Resource, r.Selector, se.ErrStatus.Message)
return fmt.Errorf("Unable to list %q: %v", r.Mapping.Resource, err)
}
return se
}
if len(r.Selector) == 0 {
return fmt.Errorf("Unable to list %q: %v", r.Mapping.Resource, err)
} else {
return fmt.Errorf("Unable to find %q that match the selector %q: %v", r.Mapping.Resource, r.Selector, err)
}
if err := fn(nil, err); err != nil {
return err
}
continue
}
return err
}
accessor := r.Mapping.MetadataAccessor
resourceVersion, _ := accessor.ResourceVersion(list)
info := &Info{
Client: r.Client,
Mapping: r.Mapping,
Namespace: r.Namespace,
accessor := r.Mapping.MetadataAccessor
resourceVersion, _ := accessor.ResourceVersion(list)
nextContinueToken, _ := accessor.Continue(list)
info := &Info{
Client: r.Client,
Mapping: r.Mapping,
Namespace: r.Namespace,

Object: list,
ResourceVersion: resourceVersion,
Object: list,
ResourceVersion: resourceVersion,
}
if err := fn(info, nil); err != nil {
return err
}
if len(nextContinueToken) == 0 {
return nil
}
continueToken = nextContinueToken
}
return fn(info, nil)
}

func (r *Selector) Watch(resourceVersion string) (watch.Interface, error) {
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/api/meta/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type MetadataAccessor interface {
Annotations(obj runtime.Object) (map[string]string, error)
SetAnnotations(obj runtime.Object, annotations map[string]string) error

Continue(obj runtime.Object) (string, error)
SetContinue(obj runtime.Object, c string) error

runtime.ResourceVersioner
}

Expand Down
17 changes: 17 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/api/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,23 @@ func (resourceAccessor) SetResourceVersion(obj runtime.Object, version string) e
return nil
}

func (resourceAccessor) Continue(obj runtime.Object) (string, error) {
accessor, err := ListAccessor(obj)
if err != nil {
return "", err
}
return accessor.GetContinue(), nil
}

func (resourceAccessor) SetContinue(obj runtime.Object, version string) error {
accessor, err := ListAccessor(obj)
if err != nil {
return err
}
accessor.SetContinue(version)
return nil
}

// extractFromOwnerReference extracts v to o. v is the OwnerReferences field of an object.
func extractFromOwnerReference(v reflect.Value, o *metav1.OwnerReference) error {
if err := runtime.Field(v, "APIVersion", &o.APIVersion); err != nil {
Expand Down

0 comments on commit da1065f

Please sign in to comment.