Skip to content

Commit

Permalink
Merge pull request #62913 from deads2k/client-04-dynamic
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 63137, 62913). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

make a simple dynamic client that is easy to use

The dynamic client has annoyed me for the last time!  The existing one takes arguments at odd levels, requires lots of information to instantiate, does some weird pool thing, and uses unusual types.  This creates an interface like this:

```go

type DynamicInterface interface {
	ClusterResource(resource schema.GroupVersionResource) DynamicResourceInterface
	NamespacedResource(resource schema.GroupVersionResource, namespace string) DynamicResourceInterface
}

type DynamicResourceInterface interface {
	Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
	Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
	UpdateStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
	Delete(name string, options *metav1.DeleteOptions) error
	DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
	Get(name string, options metav1.GetOptions) (*unstructured.Unstructured, error)
	List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
	Watch(opts metav1.ListOptions) (watch.Interface, error)
	Patch(name string, pt types.PatchType, data []byte, subresources ...string) (*unstructured.Unstructured, error)
}
```

You create it from just a `rest.Config`, no mapper, no path resolving func, no trying to set up codecs ahead of time, no unnecessary pool.  It just works.

I updated the namespace controller to use it and I updated the existing dynamic client to leverage it so that I get all their tests for "free".

@kubernetes/sig-api-machinery-pr-reviews 
@liggitt @smarterclayton @bparees @sttts @ironcladlou I know each of us has struggled with the dynamic client in our time.
@lavalamp @caesarxuchao This is vastly simplifying.  I'm eager to drop the old `ClientPool`.  client-go will technically have another incompatible semver this release.  I'm up for changing it in tree.


```release-note
client-go developers: the new dynamic client is easier to use and the old is deprecated, you must switch.
```
  • Loading branch information
Kubernetes Submit Queue committed Apr 25, 2018
2 parents e1e050a + 3632037 commit 5d7569d
Show file tree
Hide file tree
Showing 26 changed files with 838 additions and 430 deletions.
12 changes: 6 additions & 6 deletions cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
Expand Down Expand Up @@ -294,23 +293,24 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) {
}

func startNamespaceController(ctx ControllerContext) (bool, error) {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := legacyscheme.Registry.RESTMapper()

// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
// including events), takes ~10 seconds by default.
nsKubeconfig := ctx.ClientBuilder.ConfigOrDie("namespace-controller")
nsKubeconfig.QPS *= 10
nsKubeconfig.Burst *= 10
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
namespaceClientPool := dynamic.NewClientPool(nsKubeconfig, restMapper, dynamic.LegacyAPIPathResolverFunc)

discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources

dynamicClient, err := dynamic.NewForConfig(nsKubeconfig)
if err != nil {
return true, err
}

namespaceController := namespacecontroller.NewNamespaceController(
namespaceKubeClient,
namespaceClientPool,
dynamicClient,
discoverResourcesFn,
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (gc *GarbageCollector) isDangling(reference metav1.OwnerReference, item *no
// TODO: It's only necessary to talk to the API server if the owner node
// is a "virtual" node. The local graph could lag behind the real
// status, but in practice, the difference is small.
owner, err = client.Resource(resource, item.identity.Namespace).Get(reference.Name, metav1.GetOptions{})
owner, err = client.Resource(resource, resourceDefaultNamespace(resource, item.identity.Namespace)).Get(reference.Name, metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
gc.absentOwnerCache.Add(reference.UID)
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,13 @@ func TestOrphanDependentsFailure(t *testing.T) {
},
}
err := gc.orphanDependents(objectReference{}, dependents)
expected := `the server reported a conflict (patch pods pod)`
expected := `the server reported a conflict`
if err == nil || !strings.Contains(err.Error(), expected) {
t.Errorf("expected error contains text %s, got %v", expected, err)
if err != nil {
t.Errorf("expected error contains text %q, got %q", expected, err.Error())
} else {
t.Errorf("expected error contains text %q, got nil", expected)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/garbagecollector/graph_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource)
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback).
return client.
Resource(&apiResource, metav1.NamespaceAll).
List(options)
},
Expand All @@ -145,7 +145,7 @@ func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource)
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback).
return client.
Resource(&apiResource, metav1.NamespaceAll).
Watch(options)
},
Expand Down
16 changes: 12 additions & 4 deletions pkg/controller/garbagecollector/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ import (
"k8s.io/client-go/util/retry"
)

// cluster scoped resources don't have namespaces. Default to the item's namespace, but clear it for cluster scoped resources
func resourceDefaultNamespace(resource *metav1.APIResource, defaultNamespace string) string {
if resource.Namespaced {
return defaultNamespace
}
return ""
}

// apiResource consults the REST mapper to translate an <apiVersion, kind,
// namespace> tuple to a unversioned.APIResource struct.
func (gc *GarbageCollector) apiResource(apiVersion, kind string) (*metav1.APIResource, error) {
Expand Down Expand Up @@ -60,7 +68,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference, policy *metav1.De
uid := item.UID
preconditions := metav1.Preconditions{UID: &uid}
deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions, PropagationPolicy: policy}
return client.Resource(resource, item.Namespace).Delete(item.Name, &deleteOptions)
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Delete(item.Name, &deleteOptions)
}

func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) {
Expand All @@ -73,7 +81,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstr
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Get(item.Name, metav1.GetOptions{})
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Get(item.Name, metav1.GetOptions{})
}

func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
Expand All @@ -86,7 +94,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Update(obj)
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Update(obj)
}

func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) {
Expand All @@ -99,7 +107,7 @@ func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*un
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Patch(item.Name, types.StrategicMergePatchType, patch)
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch)
}

// TODO: Using Patch when strategicmerge supports deleting an entry from a
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/namespace/deletion/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_test(
srcs = ["namespaced_resources_deleter_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down
52 changes: 16 additions & 36 deletions pkg/controller/namespace/deletion/namespaced_resources_deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ type NamespacedResourcesDeleterInterface interface {
}

func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface,
clientPool dynamic.ClientPool, podsGetter v1clientset.PodsGetter,
dynamicClient dynamic.DynamicInterface, podsGetter v1clientset.PodsGetter,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
finalizerToken v1.FinalizerName, deleteNamespaceWhenDone bool) NamespacedResourcesDeleterInterface {
d := &namespacedResourcesDeleter{
nsClient: nsClient,
clientPool: clientPool,
podsGetter: podsGetter,
nsClient: nsClient,
dynamicClient: dynamicClient,
podsGetter: podsGetter,
opCache: &operationNotSupportedCache{
m: make(map[operationKey]bool),
},
Expand All @@ -68,7 +68,7 @@ type namespacedResourcesDeleter struct {
// Client to manipulate the namespace.
nsClient v1clientset.NamespaceInterface
// Dynamic client to list and delete all namespaced resources.
clientPool dynamic.ClientPool
dynamicClient dynamic.DynamicInterface
// Interface to get PodInterface.
podsGetter v1clientset.PodsGetter
// Cache of what operations are not supported on each group version resource.
Expand Down Expand Up @@ -328,9 +328,7 @@ func (d *namespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace)
// deleteCollection is a helper function that will delete the collection of resources
// it returns true if the operation was supported on the server.
// it returns an error if the operation was supported on the server but was unable to complete.
func (d *namespacedResourcesDeleter) deleteCollection(
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource,
namespace string) (bool, error) {
func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionResource, namespace string) (bool, error) {
glog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)

key := operationKey{operation: operationDeleteCollection, gvr: gvr}
Expand All @@ -339,14 +337,12 @@ func (d *namespacedResourcesDeleter) deleteCollection(
return false, nil
}

apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true}

// namespace controller does not want the garbage collector to insert the orphan finalizer since it calls
// resource deletions generically. it will ensure all resources in the namespace are purged prior to releasing
// namespace itself.
background := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &background}
err := dynamicClient.Resource(&apiResource, namespace).DeleteCollection(opts, metav1.ListOptions{})
err := d.dynamicClient.NamespacedResource(gvr, namespace).DeleteCollection(opts, metav1.ListOptions{})

if err == nil {
return true, nil
Expand All @@ -373,8 +369,7 @@ func (d *namespacedResourcesDeleter) deleteCollection(
// the list of items in the collection (if found)
// a boolean if the operation is supported
// an error if the operation is supported but could not be completed.
func (d *namespacedResourcesDeleter) listCollection(
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
glog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)

key := operationKey{operation: operationList, gvr: gvr}
Expand All @@ -383,13 +378,8 @@ func (d *namespacedResourcesDeleter) listCollection(
return nil, false, nil
}

apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true}
obj, err := dynamicClient.Resource(&apiResource, namespace).List(metav1.ListOptions{IncludeUninitialized: true})
unstructuredList, err := d.dynamicClient.NamespacedResource(gvr, namespace).List(metav1.ListOptions{IncludeUninitialized: true})
if err == nil {
unstructuredList, ok := obj.(*unstructured.UnstructuredList)
if !ok {
return nil, false, fmt.Errorf("resource: %s, expected *unstructured.UnstructuredList, got %#v", apiResource.Name, obj)
}
return unstructuredList, true, nil
}

Expand All @@ -409,22 +399,20 @@ func (d *namespacedResourcesDeleter) listCollection(
}

// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
func (d *namespacedResourcesDeleter) deleteEachItem(
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string) error {
func (d *namespacedResourcesDeleter) deleteEachItem(gvr schema.GroupVersionResource, namespace string) error {
glog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr)

unstructuredList, listSupported, err := d.listCollection(dynamicClient, gvr, namespace)
unstructuredList, listSupported, err := d.listCollection(gvr, namespace)
if err != nil {
return err
}
if !listSupported {
return nil
}
apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true}
for _, item := range unstructuredList.Items {
background := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &background}
if err = dynamicClient.Resource(&apiResource, namespace).Delete(item.GetName(), opts); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
if err = d.dynamicClient.NamespacedResource(gvr, namespace).Delete(item.GetName(), opts); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
return err
}
}
Expand All @@ -447,22 +435,15 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
}
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)

// get a client for this group version...
dynamicClient, err := d.clientPool.ClientForGroupVersionResource(gvr)
if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}

// first try to delete the entire collection
deleteCollectionSupported, err := d.deleteCollection(dynamicClient, gvr, namespace)
deleteCollectionSupported, err := d.deleteCollection(gvr, namespace)
if err != nil {
return estimate, err
}

// delete collection was not supported, so we list and delete each item...
if !deleteCollectionSupported {
err = d.deleteEachItem(dynamicClient, gvr, namespace)
err = d.deleteEachItem(gvr, namespace)
if err != nil {
return estimate, err
}
Expand All @@ -471,7 +452,7 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
// verify there are no more remaining items
// it is not an error condition for there to be remaining items if local estimate is non-zero
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
unstructuredList, listSupported, err := d.listCollection(dynamicClient, gvr, namespace)
unstructuredList, listSupported, err := d.listCollection(gvr, namespace)
if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
Expand All @@ -497,8 +478,7 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func (d *namespacedResourcesDeleter) deleteAllContent(
namespace string, namespaceDeletedAt metav1.Time) (int64, error) {
func (d *namespacedResourcesDeleter) deleteAllContent(namespace string, namespaceDeletedAt metav1.Time) (int64, error) {
estimate := int64(0)
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s", namespace)
resources, err := d.discoverResourcesFn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
)

Expand Down Expand Up @@ -173,14 +172,16 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
defer srv.Close()

mockClient := fake.NewSimpleClientset(testInput.testNamespace)
clientPool := dynamic.NewClientPool(clientConfig, legacyscheme.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}

fn := func() ([]*metav1.APIResourceList, error) {
return resources, nil
}
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), clientPool, mockClient.Core(), fn, v1.FinalizerKubernetes, true)
err := d.Delete(testInput.testNamespace.Name)
if err != nil {
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), dynamicClient, mockClient.Core(), fn, v1.FinalizerKubernetes, true)
if err := d.Delete(testInput.testNamespace.Name); err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/namespace/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type NamespaceController struct {
// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
dynamicClient dynamic.DynamicInterface,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
namespaceInformer coreinformers.NamespaceInformer,
resyncPeriod time.Duration,
Expand All @@ -72,7 +72,7 @@ func NewNamespaceController(
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), clientPool, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), dynamicClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true),
}

if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubectl/cmd/util/factory_object_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func genericDescriber(clientAccessFactory ClientAccessFactory, mapping *meta.RES
clientConfigCopy.GroupVersion = &gv

// used to fetch the resource
dynamicClient, err := dynamic.NewClient(&clientConfigCopy)
dynamicClient, err := dynamic.NewClient(&clientConfigCopy, gv)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ filegroup(

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme:all-srcs",
],
tags = ["automanaged"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ func (obj *Unstructured) IsList() bool {
_, ok = field.([]interface{})
return ok
}
func (obj *Unstructured) ToList() (*UnstructuredList, error) {
if !obj.IsList() {
// return an empty list back
return &UnstructuredList{Object: obj.Object}, nil
}

ret := &UnstructuredList{}
ret.Object = obj.Object

err := obj.EachListItem(func(item runtime.Object) error {
castItem := item.(*Unstructured)
ret.Items = append(ret.Items, *castItem)
return nil
})
if err != nil {
return nil, err
}

return ret, nil
}

func (obj *Unstructured) EachListItem(fn func(runtime.Object) error) error {
field, ok := obj.Object["items"]
Expand Down

0 comments on commit 5d7569d

Please sign in to comment.