Skip to content

Commit

Permalink
use kubernetes informer framework
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Oct 17, 2016
1 parent f689aa5 commit 9bbd1af
Showing 1 changed file with 22 additions and 37 deletions.
59 changes: 22 additions & 37 deletions main.go
Expand Up @@ -26,12 +26,11 @@ import (
"github.com/golang/glog"
"github.com/openshift/origin/pkg/util/proc"
flag "github.com/spf13/pflag"
"golang.org/x/net/context"
clientset "k8s.io/client-go/1.5/kubernetes"
"k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/1.5/pkg/runtime"
"k8s.io/client-go/1.5/pkg/watch"
restclient "k8s.io/client-go/1.5/rest"
"k8s.io/client-go/1.5/tools/cache"
"k8s.io/client-go/1.5/tools/clientcmd"
Expand Down Expand Up @@ -73,7 +72,7 @@ func main() {
glog.Fatalf("Failed to create client: %v", err)
}

initializeMetrics(kubeClient)
initializeMetricCollection(kubeClient)
metricsServer()
}

Expand Down Expand Up @@ -174,54 +173,36 @@ func (l NodeLister) List() (v1.NodeList, error) {
return l()
}

// initializeMetrics creates a new controller from the given config.
func initializeMetrics(kubeClient clientset.Interface) {
dplStore, dplController := cache.NewNamespaceKeyedIndexerAndReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Extensions().Deployments(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Extensions().Deployments(api.NamespaceAll).Watch(options)
},
}, &v1beta1.Deployment{}, resyncPeriod)

podStore, podController := cache.NewNamespaceKeyedIndexerAndReflector(
cache.NewListWatchFromClient(
kubeClient.Core().GetRESTClient(),
"pods",
api.NamespaceAll,
nil,
), &v1.Pod{}, resyncPeriod)

nodeStore, nodeController := cache.NewNamespaceKeyedIndexerAndReflector(
cache.NewListWatchFromClient(
kubeClient.Core().GetRESTClient(),
"nodes",
api.NamespaceAll,
nil,
), &v1.Node{}, resyncPeriod)

go dplController.Run()
go podController.Run()
go nodeController.Run()
// initializeMetricCollection creates and starts informers and initializes and
// registers metrics for collection.
func initializeMetricCollection(kubeClient clientset.Interface) {
cclient := kubeClient.Core().GetRESTClient()
eclient := kubeClient.Extensions().GetRESTClient()

dlw := cache.NewListWatchFromClient(eclient, "deployments", api.NamespaceAll, nil)
plw := cache.NewListWatchFromClient(cclient, "pods", api.NamespaceAll, nil)
nlw := cache.NewListWatchFromClient(cclient, "nodes", api.NamespaceAll, nil)

dinf := cache.NewSharedInformer(dlw, &v1beta1.Deployment{}, resyncPeriod)
pinf := cache.NewSharedInformer(plw, &v1.Pod{}, resyncPeriod)
ninf := cache.NewSharedInformer(nlw, &v1.Node{}, resyncPeriod)

dplLister := DeploymentLister(func() (deployments []v1beta1.Deployment, err error) {
for _, c := range dplStore.List() {
for _, c := range dinf.GetStore().List() {
deployments = append(deployments, *(c.(*v1beta1.Deployment)))
}
return deployments, nil
})

podLister := PodLister(func() (pods []v1.Pod, err error) {
for _, m := range podStore.List() {
for _, m := range pinf.GetStore().List() {
pods = append(pods, *m.(*v1.Pod))
}
return pods, nil
})

nodeLister := NodeLister(func() (machines v1.NodeList, err error) {
for _, m := range nodeStore.List() {
for _, m := range ninf.GetStore().List() {
machines.Items = append(machines.Items, *(m.(*v1.Node)))
}
return machines, nil
Expand All @@ -230,4 +211,8 @@ func initializeMetrics(kubeClient clientset.Interface) {
prometheus.MustRegister(&deploymentCollector{store: dplLister})
prometheus.MustRegister(&podCollector{store: podLister})
prometheus.MustRegister(&nodeCollector{store: nodeLister})

go dinf.Run(context.Background().Done())
go pinf.Run(context.Background().Done())
go ninf.Run(context.Background().Done())
}

0 comments on commit 9bbd1af

Please sign in to comment.