Skip to content

Commit

Permalink
Move orphaned Pod deletion logic to PodGC
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarek committed Sep 26, 2016
1 parent c19e08e commit 2c0ba1b
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 201 deletions.
4 changes: 2 additions & 2 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

if s.TerminatedPodGCThreshold > 0 {
go podgc.New(client("pod-garbage-collector"), ResyncPeriod(s), int(s.TerminatedPodGCThreshold)).
Run(wait.NeverStop)
go podgc.NewFromInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), sharedInformers.Pods().Informer(),
int(s.TerminatedPodGCThreshold)).Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}

Expand Down
6 changes: 2 additions & 4 deletions contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,8 @@ func (s *CMServer) Run(_ []string) error {
go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, int(s.LookupCacheSizeForRC)).
Run(int(s.ConcurrentRCSyncs), wait.NeverStop)

if s.TerminatedPodGCThreshold > 0 {
go podgc.New(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), s.resyncPeriod, int(s.TerminatedPodGCThreshold)).
Run(wait.NeverStop)
}
go podgc.NewFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), int(s.TerminatedPodGCThreshold)).
Run(wait.NeverStop)

//TODO(jdef) should eventually support more cloud providers here
if s.CloudProvider != mesos.ProviderName {
Expand Down
16 changes: 0 additions & 16 deletions pkg/controller/node/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,6 @@ const (
LargeClusterThreshold = 20
)

// cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist.
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists {
continue
}
if err := forcefulDeletePodFunc(pod); err != nil {
utilruntime.HandleError(err)
}
}
}

// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
Expand Down
10 changes: 0 additions & 10 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,15 +464,6 @@ func (nc *NodeController) Run() {
})
}
}, nodeEvictionPeriod, wait.NeverStop)

go wait.Until(func() {
pods, err := nc.podStore.List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return
}
cleanupOrphanedPods(pods, nc.nodeStore.Store, nc.forcefullyDeletePod)
}, 30*time.Second, wait.NeverStop)
}

// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
Expand Down Expand Up @@ -511,7 +502,6 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
nc.evictPods(deleted[i])
delete(nc.knownNodeSet, deleted[i].Name)
}

Expand Down
160 changes: 19 additions & 141 deletions pkg/controller/node/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package node

import (
"strings"
"testing"
"time"

Expand Down Expand Up @@ -1560,117 +1561,28 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}
}

func TestNodeDeletion(t *testing.T) {
fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
fakeNodeHandler := &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
// Node status has just been updated.
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
},
},
Spec: api.NodeSpec{
ExternalID: "node0",
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "node1",
CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
// Node status has just been updated.
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
},
},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
},
},
Spec: api.NodeSpec{
ExternalID: "node0",
},
},
},
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}

nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute,
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
fakeNodeHandler.Delete("node1", nil)
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore)
return true, 0
})
podEvicted := false
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podEvicted = true
}
}
if !podEvicted {
t.Error("expected pods to be evicted from the deleted node")
}
}

func TestNodeEventGeneration(t *testing.T) {
fakeNow := unversioned.Date(2016, 8, 10, 12, 0, 0, 0, time.UTC)
fakeNow := unversioned.Date(2016, 9, 10, 12, 0, 0, 0, time.UTC)
fakeNodeHandler := &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
UID: "1234567890",
CreationTimestamp: unversioned.Date(2016, 8, 10, 0, 0, 0, 0, time.UTC),
CreationTimestamp: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
},
Spec: api.NodeSpec{
ExternalID: "node0",
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
// Node status has just been updated.
LastHeartbeatTime: fakeNow,
LastTransitionTime: fakeNow,
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastHeartbeatTime: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2015, 8, 10, 0, 0, 0, 0, time.UTC),
},
},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceRequestsCPU): resource.MustParse("10"),
api.ResourceName(api.ResourceMemory): resource.MustParse("20G"),
},
},
},
},
Expand All @@ -1681,27 +1593,25 @@ func TestNodeEventGeneration(t *testing.T) {
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.nodeExistsInCloudProvider = func(nodeName string) (bool, error) {
return false, nil
}
nodeController.now = func() unversioned.Time { return fakeNow }
fakeRecorder := NewFakeRecorder()
nodeController.recorder = fakeRecorder
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}

fakeNodeHandler.Delete("node0", nil)
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
return true, 0
})
if len(fakeRecorder.events) != 3 {
t.Fatalf("unexpected events: %v", fakeRecorder.events)
if len(fakeRecorder.events) != 2 {
t.Fatalf("unexpected events, got %v, expected %v: %+v", len(fakeRecorder.events), 2, fakeRecorder.events)
}
if fakeRecorder.events[0].Reason != "RegisteredNode" || fakeRecorder.events[1].Reason != "RemovingNode" || fakeRecorder.events[2].Reason != "DeletingAllPods" {
t.Fatalf("unexpected events generation: %v", fakeRecorder.events)
if fakeRecorder.events[0].Reason != "RegisteredNode" || fakeRecorder.events[1].Reason != "DeletingNode" {
var reasons []string
for _, event := range fakeRecorder.events {
reasons = append(reasons, event.Reason)
}
t.Fatalf("unexpected events generation: %v", strings.Join(reasons, ","))
}
for _, event := range fakeRecorder.events {
involvedObject := event.InvolvedObject
Expand Down Expand Up @@ -1851,38 +1761,6 @@ func TestCheckPod(t *testing.T) {
}
}

func TestCleanupOrphanedPods(t *testing.T) {
pods := []*api.Pod{
newPod("a", "foo"),
newPod("b", "bar"),
newPod("c", "gone"),
}
nc, _ := NewNodeControllerFromClient(nil, nil, 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)

nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))
for _, pod := range pods {
p := pod
nc.podStore.Indexer.Add(&p)
}

var deleteCalls int
var deletedPodName string
forcefullyDeletePodFunc := func(p *api.Pod) error {
deleteCalls++
deletedPodName = p.ObjectMeta.Name
return nil
}
cleanupOrphanedPods(pods, nc.nodeStore.Store, forcefullyDeletePodFunc)

if deleteCalls != 1 {
t.Fatalf("expected one delete, got: %v", deleteCalls)
}
if deletedPodName != "c" {
t.Fatalf("expected deleted pod name to be 'c', but got: %q", deletedPodName)
}
}

func TestCheckNodeKubeletVersionParsing(t *testing.T) {
tests := []struct {
version string
Expand Down

0 comments on commit 2c0ba1b

Please sign in to comment.