Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make a simple dynamic client that is easy to use #62913

Merged
merged 1 commit into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
Expand Down Expand Up @@ -294,23 +293,24 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) {
}

func startNamespaceController(ctx ControllerContext) (bool, error) {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := legacyscheme.Registry.RESTMapper()

// the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls
// the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource
// including events), takes ~10 seconds by default.
nsKubeconfig := ctx.ClientBuilder.ConfigOrDie("namespace-controller")
nsKubeconfig.QPS *= 10
nsKubeconfig.Burst *= 10
namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
namespaceClientPool := dynamic.NewClientPool(nsKubeconfig, restMapper, dynamic.LegacyAPIPathResolverFunc)

discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources

dynamicClient, err := dynamic.NewForConfig(nsKubeconfig)
if err != nil {
return true, err
}

namespaceController := namespacecontroller.NewNamespaceController(
namespaceKubeClient,
namespaceClientPool,
dynamicClient,
discoverResourcesFn,
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (gc *GarbageCollector) isDangling(reference metav1.OwnerReference, item *no
// TODO: It's only necessary to talk to the API server if the owner node
// is a "virtual" node. The local graph could lag behind the real
// status, but in practice, the difference is small.
owner, err = client.Resource(resource, item.identity.Namespace).Get(reference.Name, metav1.GetOptions{})
owner, err = client.Resource(resource, resourceDefaultNamespace(resource, item.identity.Namespace)).Get(reference.Name, metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
gc.absentOwnerCache.Add(reference.UID)
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,13 @@ func TestOrphanDependentsFailure(t *testing.T) {
},
}
err := gc.orphanDependents(objectReference{}, dependents)
expected := `the server reported a conflict (patch pods pod)`
expected := `the server reported a conflict`
if err == nil || !strings.Contains(err.Error(), expected) {
t.Errorf("expected error contains text %s, got %v", expected, err)
if err != nil {
t.Errorf("expected error contains text %q, got %q", expected, err.Error())
} else {
t.Errorf("expected error contains text %q, got nil", expected)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/garbagecollector/graph_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource)
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback).
return client.
Resource(&apiResource, metav1.NamespaceAll).
List(options)
},
Expand All @@ -145,7 +145,7 @@ func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource)
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback).
return client.
Resource(&apiResource, metav1.NamespaceAll).
Watch(options)
},
Expand Down
16 changes: 12 additions & 4 deletions pkg/controller/garbagecollector/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ import (
"k8s.io/client-go/util/retry"
)

// cluster scoped resources don't have namespaces. Default to the item's namespace, but clear it for cluster scoped resources
func resourceDefaultNamespace(resource *metav1.APIResource, defaultNamespace string) string {
if resource.Namespaced {
return defaultNamespace
}
return ""
}

// apiResource consults the REST mapper to translate an <apiVersion, kind,
// namespace> tuple to a unversioned.APIResource struct.
func (gc *GarbageCollector) apiResource(apiVersion, kind string) (*metav1.APIResource, error) {
Expand Down Expand Up @@ -60,7 +68,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference, policy *metav1.De
uid := item.UID
preconditions := metav1.Preconditions{UID: &uid}
deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions, PropagationPolicy: policy}
return client.Resource(resource, item.Namespace).Delete(item.Name, &deleteOptions)
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Delete(item.Name, &deleteOptions)
}

func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) {
Expand All @@ -73,7 +81,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstr
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Get(item.Name, metav1.GetOptions{})
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Get(item.Name, metav1.GetOptions{})
}

func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
Expand All @@ -86,7 +94,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Update(obj)
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Update(obj)
}

func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) {
Expand All @@ -99,7 +107,7 @@ func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*un
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Patch(item.Name, types.StrategicMergePatchType, patch)
return client.Resource(resource, resourceDefaultNamespace(resource, item.Namespace)).Patch(item.Name, types.StrategicMergePatchType, patch)
}

// TODO: Using Patch when strategicmerge supports deleting an entry from a
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/namespace/deletion/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_test(
srcs = ["namespaced_resources_deleter_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down
52 changes: 16 additions & 36 deletions pkg/controller/namespace/deletion/namespaced_resources_deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ type NamespacedResourcesDeleterInterface interface {
}

func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface,
clientPool dynamic.ClientPool, podsGetter v1clientset.PodsGetter,
dynamicClient dynamic.DynamicInterface, podsGetter v1clientset.PodsGetter,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
finalizerToken v1.FinalizerName, deleteNamespaceWhenDone bool) NamespacedResourcesDeleterInterface {
d := &namespacedResourcesDeleter{
nsClient: nsClient,
clientPool: clientPool,
podsGetter: podsGetter,
nsClient: nsClient,
dynamicClient: dynamicClient,
podsGetter: podsGetter,
opCache: &operationNotSupportedCache{
m: make(map[operationKey]bool),
},
Expand All @@ -68,7 +68,7 @@ type namespacedResourcesDeleter struct {
// Client to manipulate the namespace.
nsClient v1clientset.NamespaceInterface
// Dynamic client to list and delete all namespaced resources.
clientPool dynamic.ClientPool
dynamicClient dynamic.DynamicInterface
// Interface to get PodInterface.
podsGetter v1clientset.PodsGetter
// Cache of what operations are not supported on each group version resource.
Expand Down Expand Up @@ -328,9 +328,7 @@ func (d *namespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace)
// deleteCollection is a helper function that will delete the collection of resources
// it returns true if the operation was supported on the server.
// it returns an error if the operation was supported on the server but was unable to complete.
func (d *namespacedResourcesDeleter) deleteCollection(
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource,
namespace string) (bool, error) {
func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionResource, namespace string) (bool, error) {
glog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)

key := operationKey{operation: operationDeleteCollection, gvr: gvr}
Expand All @@ -339,14 +337,12 @@ func (d *namespacedResourcesDeleter) deleteCollection(
return false, nil
}

apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true}

// namespace controller does not want the garbage collector to insert the orphan finalizer since it calls
// resource deletions generically. it will ensure all resources in the namespace are purged prior to releasing
// namespace itself.
background := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &background}
err := dynamicClient.Resource(&apiResource, namespace).DeleteCollection(opts, metav1.ListOptions{})
err := d.dynamicClient.NamespacedResource(gvr, namespace).DeleteCollection(opts, metav1.ListOptions{})

if err == nil {
return true, nil
Expand All @@ -373,8 +369,7 @@ func (d *namespacedResourcesDeleter) deleteCollection(
// the list of items in the collection (if found)
// a boolean if the operation is supported
// an error if the operation is supported but could not be completed.
func (d *namespacedResourcesDeleter) listCollection(
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
glog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)

key := operationKey{operation: operationList, gvr: gvr}
Expand All @@ -383,13 +378,8 @@ func (d *namespacedResourcesDeleter) listCollection(
return nil, false, nil
}

apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true}
obj, err := dynamicClient.Resource(&apiResource, namespace).List(metav1.ListOptions{IncludeUninitialized: true})
unstructuredList, err := d.dynamicClient.NamespacedResource(gvr, namespace).List(metav1.ListOptions{IncludeUninitialized: true})
if err == nil {
unstructuredList, ok := obj.(*unstructured.UnstructuredList)
if !ok {
return nil, false, fmt.Errorf("resource: %s, expected *unstructured.UnstructuredList, got %#v", apiResource.Name, obj)
}
return unstructuredList, true, nil
}

Expand All @@ -409,22 +399,20 @@ func (d *namespacedResourcesDeleter) listCollection(
}

// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
func (d *namespacedResourcesDeleter) deleteEachItem(
dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string) error {
func (d *namespacedResourcesDeleter) deleteEachItem(gvr schema.GroupVersionResource, namespace string) error {
glog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr)

unstructuredList, listSupported, err := d.listCollection(dynamicClient, gvr, namespace)
unstructuredList, listSupported, err := d.listCollection(gvr, namespace)
if err != nil {
return err
}
if !listSupported {
return nil
}
apiResource := metav1.APIResource{Name: gvr.Resource, Namespaced: true}
for _, item := range unstructuredList.Items {
background := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &background}
if err = dynamicClient.Resource(&apiResource, namespace).Delete(item.GetName(), opts); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
if err = d.dynamicClient.NamespacedResource(gvr, namespace).Delete(item.GetName(), opts); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
return err
}
}
Expand All @@ -447,22 +435,15 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
}
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)

// get a client for this group version...
dynamicClient, err := d.clientPool.ClientForGroupVersionResource(gvr)
if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}

// first try to delete the entire collection
deleteCollectionSupported, err := d.deleteCollection(dynamicClient, gvr, namespace)
deleteCollectionSupported, err := d.deleteCollection(gvr, namespace)
if err != nil {
return estimate, err
}

// delete collection was not supported, so we list and delete each item...
if !deleteCollectionSupported {
err = d.deleteEachItem(dynamicClient, gvr, namespace)
err = d.deleteEachItem(gvr, namespace)
if err != nil {
return estimate, err
}
Expand All @@ -471,7 +452,7 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
// verify there are no more remaining items
// it is not an error condition for there to be remaining items if local estimate is non-zero
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
unstructuredList, listSupported, err := d.listCollection(dynamicClient, gvr, namespace)
unstructuredList, listSupported, err := d.listCollection(gvr, namespace)
if err != nil {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
Expand All @@ -497,8 +478,7 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func (d *namespacedResourcesDeleter) deleteAllContent(
namespace string, namespaceDeletedAt metav1.Time) (int64, error) {
func (d *namespacedResourcesDeleter) deleteAllContent(namespace string, namespaceDeletedAt metav1.Time) (int64, error) {
estimate := int64(0)
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s", namespace)
resources, err := d.discoverResourcesFn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
)

Expand Down Expand Up @@ -173,14 +172,16 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
defer srv.Close()

mockClient := fake.NewSimpleClientset(testInput.testNamespace)
clientPool := dynamic.NewClientPool(clientConfig, legacyscheme.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}

fn := func() ([]*metav1.APIResourceList, error) {
return resources, nil
}
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), clientPool, mockClient.Core(), fn, v1.FinalizerKubernetes, true)
err := d.Delete(testInput.testNamespace.Name)
if err != nil {
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), dynamicClient, mockClient.Core(), fn, v1.FinalizerKubernetes, true)
if err := d.Delete(testInput.testNamespace.Name); err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/namespace/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type NamespaceController struct {
// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
dynamicClient dynamic.DynamicInterface,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
namespaceInformer coreinformers.NamespaceInformer,
resyncPeriod time.Duration,
Expand All @@ -72,7 +72,7 @@ func NewNamespaceController(
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), clientPool, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), dynamicClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true),
}

if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubectl/cmd/util/factory_object_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func genericDescriber(clientAccessFactory ClientAccessFactory, mapping *meta.RES
clientConfigCopy.GroupVersion = &gv

// used to fetch the resource
dynamicClient, err := dynamic.NewClient(&clientConfigCopy)
dynamicClient, err := dynamic.NewClient(&clientConfigCopy, gv)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ filegroup(

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme:all-srcs",
],
tags = ["automanaged"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ func (obj *Unstructured) IsList() bool {
_, ok = field.([]interface{})
return ok
}
func (obj *Unstructured) ToList() (*UnstructuredList, error) {
if !obj.IsList() {
// return an empty list back
return &UnstructuredList{Object: obj.Object}, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem correct... shouldn't this either return an error or put the object in Items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens on partial input. Logically you may have no items. In which case islist says false, but you know it's a list and needs to be empty.

}

ret := &UnstructuredList{}
ret.Object = obj.Object

err := obj.EachListItem(func(item runtime.Object) error {
castItem := item.(*Unstructured)
ret.Items = append(ret.Items, *castItem)
return nil
})
if err != nil {
return nil, err
}

return ret, nil
}

func (obj *Unstructured) EachListItem(fn func(runtime.Object) error) error {
field, ok := obj.Object["items"]
Expand Down