Skip to content

Commit a29aca4

Browse files
authoredSep 13, 2022
fix: Mark nodes that are pending deletion in cluster state for scheduling (#2477)
1 parent 436374b commit a29aca4

File tree

11 files changed

+527
-49
lines changed

11 files changed

+527
-49
lines changed
 

‎pkg/controllers/consolidation/controller.go

+46-31
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/aws/karpenter/pkg/controllers/state"
4343
"github.com/aws/karpenter/pkg/events"
4444
"github.com/aws/karpenter/pkg/metrics"
45+
nodeutils "github.com/aws/karpenter/pkg/utils/node"
4546
"github.com/aws/karpenter/pkg/utils/pod"
4647
)
4748

@@ -202,6 +203,10 @@ func (c *Controller) candidateNodes(ctx context.Context) ([]candidateNode, error
202203
provisioner = provisioners[provName]
203204
instanceTypeMap = instanceTypesByProvisioner[provName]
204205
}
206+
// skip any nodes that are already marked for deletion and being handled
207+
if n.MarkedForDeletion {
208+
return true
209+
}
205210
// skip any nodes where we can't determine the provisioner
206211
if provisioner == nil || instanceTypeMap == nil {
207212
return true
@@ -242,7 +247,7 @@ func (c *Controller) candidateNodes(ctx context.Context) ([]candidateNode, error
242247
return true
243248
}
244249

245-
pods, err := c.getNodePods(ctx, n.Node.Name)
250+
pods, err := nodeutils.GetNodePods(ctx, c.kubeClient, n.Node)
246251
if err != nil {
247252
logging.FromContext(ctx).Errorf("Determining node pods, %s", err)
248253
return true
@@ -367,21 +372,22 @@ func byNodeDisruptionCost(nodes []candidateNode) func(i int, j int) bool {
367372
}
368373

369374
// launchReplacementNode launches a replacement node and blocks until it is ready
370-
func (c *Controller) launchReplacementNode(ctx context.Context, minCost consolidationAction) error {
375+
func (c *Controller) launchReplacementNode(ctx context.Context, action consolidationAction) error {
371376
defer metrics.Measure(consolidationReplacementNodeInitializedHistogram)()
372-
if len(minCost.oldNodes) != 1 {
373-
return fmt.Errorf("expected a single node to replace, found %d", len(minCost.oldNodes))
377+
if len(action.oldNodes) != 1 {
378+
return fmt.Errorf("expected a single node to replace, found %d", len(action.oldNodes))
374379
}
380+
oldNode := action.oldNodes[0]
375381

376382
// cordon the node before we launch the replacement to prevent new pods from scheduling to the node
377-
if err := c.setNodeUnschedulable(ctx, minCost.oldNodes[0].Name, true); err != nil {
378-
return fmt.Errorf("cordoning node %s, %w", minCost.oldNodes[0].Name, err)
383+
if err := c.setNodeUnschedulable(ctx, action.oldNodes[0].Name, true); err != nil {
384+
return fmt.Errorf("cordoning node %s, %w", oldNode.Name, err)
379385
}
380386

381-
nodeNames, err := c.provisioner.LaunchNodes(ctx, provisioning.LaunchOptions{RecordPodNomination: false}, minCost.replacementNode)
387+
nodeNames, err := c.provisioner.LaunchNodes(ctx, provisioning.LaunchOptions{RecordPodNomination: false}, action.replacementNode)
382388
if err != nil {
383389
// uncordon the node as the launch may fail (e.g. ICE or incompatible AMI)
384-
err = multierr.Append(err, c.setNodeUnschedulable(ctx, minCost.oldNodes[0].Name, false))
390+
err = multierr.Append(err, c.setNodeUnschedulable(ctx, oldNode.Name, false))
385391
return err
386392
}
387393
if len(nodeNames) != 1 {
@@ -391,6 +397,9 @@ func (c *Controller) launchReplacementNode(ctx context.Context, minCost consolid
391397

392398
consolidationNodesCreatedCounter.Add(1)
393399

400+
// We have the new node created at the API server so mark the old node for deletion
401+
c.cluster.MarkForDeletion(oldNode.Name)
402+
394403
var k8Node v1.Node
395404
// Wait for the node to be ready
396405
var once sync.Once
@@ -399,7 +408,7 @@ func (c *Controller) launchReplacementNode(ctx context.Context, minCost consolid
399408
return fmt.Errorf("getting node, %w", err)
400409
}
401410
once.Do(func() {
402-
c.recorder.LaunchingNodeForConsolidation(&k8Node, minCost.String())
411+
c.recorder.LaunchingNodeForConsolidation(&k8Node, action.String())
403412
})
404413

405414
if _, ok := k8Node.Labels[v1alpha5.LabelNodeInitialized]; !ok {
@@ -410,30 +419,13 @@ func (c *Controller) launchReplacementNode(ctx context.Context, minCost consolid
410419
return nil
411420
}, waitRetryOptions...); err != nil {
412421
// node never become ready, so uncordon the node we were trying to delete and report the error
413-
return multierr.Combine(c.setNodeUnschedulable(ctx, minCost.oldNodes[0].Name, false),
422+
c.cluster.UnmarkForDeletion(oldNode.Name)
423+
return multierr.Combine(c.setNodeUnschedulable(ctx, oldNode.Name, false),
414424
fmt.Errorf("timed out checking node readiness, %w", err))
415425
}
416426
return nil
417427
}
418428

419-
func (c *Controller) getNodePods(ctx context.Context, nodeName string) ([]*v1.Pod, error) {
420-
var podList v1.PodList
421-
if err := c.kubeClient.List(ctx, &podList, client.MatchingFields{"spec.nodeName": nodeName}); err != nil {
422-
return nil, fmt.Errorf("listing pods, %w", err)
423-
}
424-
var pods []*v1.Pod
425-
for i := range podList.Items {
426-
// these pods don't need to be rescheduled
427-
if pod.IsOwnedByNode(&podList.Items[i]) ||
428-
pod.IsOwnedByDaemonSet(&podList.Items[i]) ||
429-
pod.IsTerminal(&podList.Items[i]) {
430-
continue
431-
}
432-
pods = append(pods, &podList.Items[i])
433-
}
434-
return pods, nil
435-
}
436-
437429
func (c *Controller) canBeTerminated(node candidateNode, pdbs *PDBLimits) error {
438430
if !node.DeletionTimestamp.IsZero() {
439431
return fmt.Errorf("already being deleted")
@@ -481,11 +473,34 @@ func (c *Controller) nodeConsolidationOptionReplaceOrDelete(ctx context.Context,
481473
defer metrics.Measure(consolidationDurationHistogram.WithLabelValues("Replace/Delete"))()
482474

483475
var stateNodes []*state.Node
476+
var markedForDeletionNodes []*state.Node
477+
candidateNodeIsDeleting := false
478+
484479
c.cluster.ForEachNode(func(n *state.Node) bool {
485-
stateNodes = append(stateNodes, n.DeepCopy())
480+
if node.Name == n.Node.Name && n.MarkedForDeletion {
481+
candidateNodeIsDeleting = true
482+
}
483+
if !n.MarkedForDeletion {
484+
stateNodes = append(stateNodes, n.DeepCopy())
485+
} else {
486+
markedForDeletionNodes = append(markedForDeletionNodes, n.DeepCopy())
487+
}
486488
return true
487489
})
488-
scheduler, err := c.provisioner.NewScheduler(ctx, node.pods, stateNodes, scheduling.SchedulerOptions{
490+
// We do one final check to ensure that the node that we are attempting to consolidate isn't
491+
// already handled for deletion by some other controller. This could happen if the node was markedForDeletion
492+
// between returning the candidateNodes and getting the stateNodes above
493+
if candidateNodeIsDeleting {
494+
return consolidationAction{result: consolidateResultNoAction}, nil
495+
}
496+
497+
// We get the pods that are on nodes that are deleting
498+
deletingNodePods, err := nodeutils.GetNodePods(ctx, c.kubeClient, lo.Map(markedForDeletionNodes, func(n *state.Node, _ int) *v1.Node { return n.Node })...)
499+
if err != nil {
500+
return consolidationAction{result: consolidateResultUnknown}, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
501+
}
502+
pods := append(node.pods, deletingNodePods...)
503+
scheduler, err := c.provisioner.NewScheduler(ctx, pods, stateNodes, scheduling.SchedulerOptions{
489504
SimulationMode: true,
490505
ExcludeNodes: []string{node.Name},
491506
})
@@ -494,7 +509,7 @@ func (c *Controller) nodeConsolidationOptionReplaceOrDelete(ctx context.Context,
494509
return consolidationAction{result: consolidateResultUnknown}, fmt.Errorf("creating scheduler, %w", err)
495510
}
496511

497-
newNodes, inflightNodes, err := scheduler.Solve(ctx, node.pods)
512+
newNodes, inflightNodes, err := scheduler.Solve(ctx, pods)
498513
if err != nil {
499514
return consolidationAction{result: consolidateResultUnknown}, fmt.Errorf("simulating scheduling, %w", err)
500515
}

‎pkg/controllers/consolidation/suite_test.go

+154
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ var ctx context.Context
5353
var env *test.Environment
5454
var cluster *state.Cluster
5555
var controller *consolidation.Controller
56+
var provisioningController *provisioning.Controller
5657
var provisioner *provisioning.Provisioner
5758
var cloudProvider *fake.CloudProvider
5859
var clientSet *kubernetes.Clientset
@@ -82,6 +83,7 @@ var _ = BeforeSuite(func() {
8283
clientSet = kubernetes.NewForConfigOrDie(e.Config)
8384
recorder = test.NewEventRecorder()
8485
provisioner = provisioning.NewProvisioner(ctx, cfg, env.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster)
86+
provisioningController = provisioning.NewController(env.Client, provisioner, recorder)
8587
})
8688
Expect(env.Start()).To(Succeed(), "Failed to start environment")
8789
})
@@ -1307,6 +1309,158 @@ var _ = Describe("Empty Nodes", func() {
13071309
})
13081310
})
13091311

1312+
var _ = Describe("Parallelization", func() {
1313+
It("should schedule an additional node when receiving pending pods while consolidating", func() {
1314+
labels := map[string]string{
1315+
"app": "test",
1316+
}
1317+
// create our RS so we can link a pod to it
1318+
rs := test.ReplicaSet()
1319+
ExpectApplied(ctx, env.Client, rs)
1320+
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
1321+
1322+
pod := test.Pod(test.PodOptions{
1323+
ObjectMeta: metav1.ObjectMeta{Labels: labels,
1324+
OwnerReferences: []metav1.OwnerReference{
1325+
{
1326+
APIVersion: "apps/v1",
1327+
Kind: "ReplicaSet",
1328+
Name: rs.Name,
1329+
UID: rs.UID,
1330+
Controller: aws.Bool(true),
1331+
BlockOwnerDeletion: aws.Bool(true),
1332+
},
1333+
}}})
1334+
1335+
prov := test.Provisioner(test.ProvisionerOptions{Consolidation: &v1alpha5.Consolidation{Enabled: aws.Bool(true)}})
1336+
1337+
// Add a finalizer to the node so that it sticks around for the scheduling loop
1338+
node := test.Node(test.NodeOptions{
1339+
ObjectMeta: metav1.ObjectMeta{
1340+
Labels: map[string]string{
1341+
v1alpha5.ProvisionerNameLabelKey: prov.Name,
1342+
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name(),
1343+
v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType,
1344+
v1.LabelTopologyZone: mostExpensiveOffering.Zone,
1345+
},
1346+
Finalizers: []string{"karpenter.sh/test-finalizer"},
1347+
},
1348+
Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("32")}})
1349+
1350+
ExpectApplied(ctx, env.Client, rs, pod, node, prov)
1351+
ExpectMakeNodesReady(ctx, env.Client, node)
1352+
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node))
1353+
ExpectManualBinding(ctx, env.Client, pod, node)
1354+
ExpectScheduled(ctx, env.Client, pod)
1355+
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed())
1356+
1357+
fakeClock.Step(10 * time.Minute)
1358+
1359+
// Run the processing loop in parallel in the background with environment context
1360+
go func() {
1361+
_, err := controller.ProcessCluster(env.Ctx)
1362+
Expect(err).ToNot(HaveOccurred())
1363+
}()
1364+
1365+
Eventually(func(g Gomega) {
1366+
// should create a new node as there is a cheaper one that can hold the pod
1367+
nodes := &v1.NodeList{}
1368+
g.Expect(env.Client.List(ctx, nodes)).To(Succeed())
1369+
g.Expect(len(nodes.Items)).To(Equal(2))
1370+
}).Should(Succeed())
1371+
1372+
// Add a new pending pod that should schedule while node is not yet deleted
1373+
pods := ExpectProvisionedNoBinding(ctx, env.Client, provisioningController, test.UnschedulablePod())
1374+
nodes := &v1.NodeList{}
1375+
Expect(env.Client.List(ctx, nodes)).To(Succeed())
1376+
Expect(len(nodes.Items)).To(Equal(3))
1377+
Expect(pods[0].Spec.NodeName).NotTo(Equal(node.Name))
1378+
})
1379+
It("should not consolidate a node that is launched for pods on a deleting node", func() {
1380+
labels := map[string]string{
1381+
"app": "test",
1382+
}
1383+
// create our RS so we can link a pod to it
1384+
rs := test.ReplicaSet()
1385+
ExpectApplied(ctx, env.Client, rs)
1386+
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())
1387+
1388+
prov := test.Provisioner(test.ProvisionerOptions{Consolidation: &v1alpha5.Consolidation{Enabled: aws.Bool(true)}})
1389+
podOpts := test.PodOptions{
1390+
ObjectMeta: metav1.ObjectMeta{
1391+
Labels: labels,
1392+
OwnerReferences: []metav1.OwnerReference{
1393+
{
1394+
APIVersion: "apps/v1",
1395+
Kind: "ReplicaSet",
1396+
Name: rs.Name,
1397+
UID: rs.UID,
1398+
Controller: aws.Bool(true),
1399+
BlockOwnerDeletion: aws.Bool(true),
1400+
},
1401+
},
1402+
},
1403+
ResourceRequirements: v1.ResourceRequirements{
1404+
Requests: v1.ResourceList{
1405+
v1.ResourceCPU: resource.MustParse("1"),
1406+
},
1407+
},
1408+
}
1409+
1410+
var pods []*v1.Pod
1411+
for i := 0; i < 5; i++ {
1412+
pod := test.UnschedulablePod(podOpts)
1413+
pods = append(pods, pod)
1414+
}
1415+
ExpectApplied(ctx, env.Client, rs, prov)
1416+
ExpectProvisioned(ctx, env.Client, provisioningController, pods...)
1417+
1418+
nodeList := &v1.NodeList{}
1419+
Expect(env.Client.List(ctx, nodeList)).To(Succeed())
1420+
Expect(len(nodeList.Items)).To(Equal(1))
1421+
1422+
// Update cluster state with new node
1423+
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(&nodeList.Items[0]))
1424+
1425+
// Reset the bindings so we can re-record bindings
1426+
recorder.ResetBindings()
1427+
1428+
// Mark the node for deletion and re-trigger reconciliation
1429+
oldNodeName := nodeList.Items[0].Name
1430+
cluster.MarkForDeletion(nodeList.Items[0].Name)
1431+
ExpectProvisionedNoBinding(ctx, env.Client, provisioningController)
1432+
1433+
// Make sure that the cluster state is aware of the current node state
1434+
Expect(env.Client.List(ctx, nodeList)).To(Succeed())
1435+
Expect(len(nodeList.Items)).To(Equal(2))
1436+
newNode, _ := lo.Find(nodeList.Items, func(n v1.Node) bool { return n.Name != oldNodeName })
1437+
1438+
for i := range nodeList.Items {
1439+
node := nodeList.Items[i]
1440+
ExpectMakeNodesReady(ctx, env.Client, &node)
1441+
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(&node))
1442+
}
1443+
1444+
// Wait for the nomination cache to expire
1445+
time.Sleep(time.Second * 11)
1446+
1447+
// Re-create the pods to re-bind them
1448+
for i := 0; i < 2; i++ {
1449+
ExpectDeleted(ctx, env.Client, pods[i])
1450+
pod := test.UnschedulablePod(podOpts)
1451+
ExpectApplied(ctx, env.Client, pod)
1452+
ExpectManualBinding(ctx, env.Client, pod, &newNode)
1453+
}
1454+
1455+
// Trigger a reconciliation run which should take into account the deleting node
1456+
// Consolidation shouldn't trigger additional actions
1457+
fakeClock.Step(10 * time.Minute)
1458+
result, err := controller.ProcessCluster(env.Ctx)
1459+
Expect(err).ToNot(HaveOccurred())
1460+
Expect(result).To(Equal(consolidation.ProcessResultNothingToDo))
1461+
})
1462+
})
1463+
13101464
func leastExpensiveInstanceWithZone(zone string) cloudprovider.InstanceType {
13111465
for _, elem := range onDemandInstances {
13121466
if hasZone(elem.Offerings(), zone) {

‎pkg/controllers/consolidation/types.go

+3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const (
4141
consolidateResultDelete
4242
consolidateResultDeleteEmpty
4343
consolidateResultReplace
44+
consolidateResultNoAction
4445
)
4546

4647
func (r consolidateResult) String() string {
@@ -55,6 +56,8 @@ func (r consolidateResult) String() string {
5556
return "Delete (empty node)"
5657
case consolidateResultReplace:
5758
return "Replace"
59+
case consolidateResultNoAction:
60+
return "NoAction"
5861
default:
5962
return fmt.Sprintf("Unknown (%d)", r)
6063
}

‎pkg/controllers/node/expiration.go

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ func (r *Expiration) Reconcile(ctx context.Context, provisioner *v1alpha5.Provis
4747
expirationTime := node.CreationTimestamp.Add(expirationTTL)
4848
if r.clock.Now().After(expirationTime) {
4949
logging.FromContext(ctx).Infof("Triggering termination for expired node after %s (+%s)", expirationTTL, time.Since(expirationTime))
50+
51+
// The delete operation implicitly marks the node for deletion for handling with scheduling
52+
// This also implicitly triggers provisioning of the new node since at least one pod should go pending
5053
if err := r.kubeClient.Delete(ctx, node); err != nil {
5154
return reconcile.Result{}, fmt.Errorf("deleting node, %w", err)
5255
}

‎pkg/controllers/provisioning/provisioner.go

+22-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/imdario/mergo"
2727
"github.com/prometheus/client_golang/prometheus"
28+
"github.com/samber/lo"
2829
"go.uber.org/multierr"
2930
appsv1 "k8s.io/api/apps/v1"
3031
v1 "k8s.io/api/core/v1"
@@ -47,6 +48,7 @@ import (
4748
"github.com/aws/karpenter/pkg/metrics"
4849
"github.com/aws/karpenter/pkg/scheduling"
4950
"github.com/aws/karpenter/pkg/utils/injection"
51+
"github.com/aws/karpenter/pkg/utils/node"
5052
"github.com/aws/karpenter/pkg/utils/pod"
5153
"github.com/aws/karpenter/pkg/utils/resources"
5254
)
@@ -140,16 +142,33 @@ func (p *Provisioner) Provision(ctx context.Context) error {
140142
// scheduling loop when we launch a new node. When this order is reversed, our node capacity may be reduced by pods
141143
// that have bound which we then provision new un-needed capacity for.
142144
var stateNodes []*state.Node
145+
var markedForDeletionNodes []*state.Node
143146
p.cluster.ForEachNode(func(node *state.Node) bool {
144-
stateNodes = append(stateNodes, node.DeepCopy())
147+
// We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered
148+
// as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for
149+
// the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered.
150+
if !node.MarkedForDeletion {
151+
stateNodes = append(stateNodes, node.DeepCopy())
152+
} else {
153+
markedForDeletionNodes = append(markedForDeletionNodes, node.DeepCopy())
154+
}
145155
return true
146156
})
147157

148158
// Get pods, exit if nothing to do
149-
pods, err := p.getPods(ctx)
159+
pendingPods, err := p.getPendingPods(ctx)
160+
if err != nil {
161+
return err
162+
}
163+
// Get pods from nodes that are preparing for deletion
164+
// We do this after getting the pending pods so that we undershoot if pods are
165+
// actively migrating from a node that is being deleted
166+
// NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them
167+
deletingNodePods, err := node.GetNodePods(ctx, p.kubeClient, lo.Map(markedForDeletionNodes, func(n *state.Node, _ int) *v1.Node { return n.Node })...)
150168
if err != nil {
151169
return err
152170
}
171+
pods := append(pendingPods, deletingNodePods...)
153172
if len(pods) == 0 {
154173
return nil
155174
}
@@ -194,7 +213,7 @@ func (p *Provisioner) LaunchNodes(ctx context.Context, opts LaunchOptions, nodes
194213
return nodeNames, nil
195214
}
196215

197-
func (p *Provisioner) getPods(ctx context.Context) ([]*v1.Pod, error) {
216+
func (p *Provisioner) getPendingPods(ctx context.Context) ([]*v1.Pod, error) {
198217
var podList v1.PodList
199218
if err := p.kubeClient.List(ctx, &podList, client.MatchingFields{"spec.nodeName": ""}); err != nil {
200219
return nil, fmt.Errorf("listing pods, %w", err)

‎pkg/controllers/provisioning/suite_test.go

+50-3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ import (
4747

4848
var ctx context.Context
4949
var fakeClock *clock.FakeClock
50+
var cluster *state.Cluster
51+
var nodeController *state.NodeController
52+
var cloudProvider cloudprovider.CloudProvider
5053
var controller *provisioning.Controller
5154
var env *test.Environment
5255
var recorder *test.EventRecorder
@@ -61,12 +64,12 @@ func TestAPIs(t *testing.T) {
6164

6265
var _ = BeforeSuite(func() {
6366
env = test.NewEnvironment(ctx, func(e *test.Environment) {
64-
cloudProvider := &fake.CloudProvider{}
65-
recorder = test.NewEventRecorder()
67+
cloudProvider = &fake.CloudProvider{}
6668
cfg = test.NewConfig()
6769
recorder = test.NewEventRecorder()
6870
fakeClock = clock.NewFakeClock(time.Now())
69-
cluster := state.NewCluster(fakeClock, cfg, e.Client, cloudProvider)
71+
cluster = state.NewCluster(fakeClock, cfg, e.Client, cloudProvider)
72+
nodeController = state.NewNodeController(e.Client, cluster)
7073
prov := provisioning.NewProvisioner(ctx, cfg, e.Client, corev1.NewForConfigOrDie(e.Config), recorder, cloudProvider, cluster)
7174
controller = provisioning.NewController(e.Client, prov, recorder)
7275
instanceTypes, _ := cloudProvider.GetInstanceTypes(context.Background(), nil)
@@ -78,6 +81,11 @@ var _ = BeforeSuite(func() {
7881
Expect(env.Start()).To(Succeed(), "Failed to start environment")
7982
})
8083

84+
var _ = BeforeEach(func() {
85+
recorder.Reset()
86+
cluster = state.NewCluster(fakeClock, cfg, env.Client, cloudProvider)
87+
})
88+
8189
var _ = AfterSuite(func() {
8290
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
8391
})
@@ -182,6 +190,45 @@ var _ = Describe("Provisioning", func() {
182190
ExpectScheduled(ctx, env.Client, pod)
183191
}
184192
})
193+
It("should schedule all pods on one node when node is in deleting state", func() {
194+
provisioner := test.Provisioner()
195+
its, err := cloudProvider.GetInstanceTypes(ctx, provisioner)
196+
Expect(err).To(BeNil())
197+
node := test.Node(test.NodeOptions{
198+
ObjectMeta: metav1.ObjectMeta{
199+
Labels: map[string]string{
200+
v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
201+
v1.LabelInstanceTypeStable: its[0].Name(),
202+
},
203+
Finalizers: []string{v1alpha5.TerminationFinalizer},
204+
}},
205+
)
206+
ExpectApplied(ctx, env.Client, node, provisioner)
207+
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
208+
209+
// Schedule 3 pods to the node that currently exists
210+
for i := 0; i < 3; i++ {
211+
pod := test.UnschedulablePod()
212+
ExpectApplied(ctx, env.Client, pod)
213+
ExpectManualBinding(ctx, env.Client, pod, node)
214+
}
215+
216+
// Node shouldn't fully delete since it has a finalizer
217+
Expect(env.Client.Delete(ctx, node)).To(Succeed())
218+
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
219+
220+
// Provision without a binding since some pods will already be bound
221+
// Should all schedule to the new node, ignoring the old node
222+
ExpectProvisionedNoBinding(ctx, env.Client, controller, test.UnschedulablePod(), test.UnschedulablePod())
223+
nodes := &v1.NodeList{}
224+
Expect(env.Client.List(ctx, nodes)).To(Succeed())
225+
Expect(len(nodes.Items)).To(Equal(2))
226+
227+
// Scheduler should attempt to schedule all the pods to the new node
228+
recorder.ForEachBinding(func(p *v1.Pod, n *v1.Node) {
229+
Expect(n.Name).ToNot(Equal(node.Name))
230+
})
231+
})
185232
Context("Resource Limits", func() {
186233
It("should not schedule when limits are exceeded", func() {
187234
ExpectApplied(ctx, env.Client, test.Provisioner(test.ProvisionerOptions{

‎pkg/controllers/state/cluster.go

+39-12
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ type Node struct {
117117
PodTotalRequests v1.ResourceList
118118
// PodTotalLimits is the total resource limits scheduled to this node
119119
PodTotalLimits v1.ResourceList
120+
// MarkedForDeletion marks this node to say that there is some controller that is
121+
// planning to delete this node so consider pods that are present on it available for scheduling
122+
MarkedForDeletion bool
120123
}
121124

122125
// ForPodsWithAntiAffinity calls the supplied function once for each pod with required anti affinity terms that is
@@ -177,18 +180,37 @@ func (c *Cluster) NominateNodeForPod(nodeName string) {
177180
c.nominatedNodes.SetDefault(nodeName, nil)
178181
}
179182

183+
// UnmarkForDeletion removes the marking on the node as a node the controller intends to delete
184+
func (c *Cluster) UnmarkForDeletion(nodeName string) {
185+
c.mu.Lock()
186+
defer c.mu.Unlock()
187+
if _, ok := c.nodes[nodeName]; ok {
188+
c.nodes[nodeName].MarkedForDeletion = false
189+
}
190+
}
191+
192+
// MarkForDeletion marks the node as pending deletion in the internal cluster state
193+
func (c *Cluster) MarkForDeletion(nodeName string) {
194+
c.mu.Lock()
195+
defer c.mu.Unlock()
196+
if _, ok := c.nodes[nodeName]; ok {
197+
c.nodes[nodeName].MarkedForDeletion = true
198+
}
199+
}
200+
180201
// newNode always returns a node, even if some portion of the update has failed
181202
func (c *Cluster) newNode(ctx context.Context, node *v1.Node) (*Node, error) {
182203
n := &Node{
183-
Node: node,
184-
Capacity: v1.ResourceList{},
185-
Allocatable: v1.ResourceList{},
186-
Available: v1.ResourceList{},
187-
HostPortUsage: scheduling.NewHostPortUsage(),
188-
VolumeUsage: scheduling.NewVolumeLimits(c.kubeClient),
189-
VolumeLimits: scheduling.VolumeCount{},
190-
podRequests: map[types.NamespacedName]v1.ResourceList{},
191-
podLimits: map[types.NamespacedName]v1.ResourceList{},
204+
Node: node,
205+
Capacity: v1.ResourceList{},
206+
Allocatable: v1.ResourceList{},
207+
Available: v1.ResourceList{},
208+
HostPortUsage: scheduling.NewHostPortUsage(),
209+
VolumeUsage: scheduling.NewVolumeLimits(c.kubeClient),
210+
VolumeLimits: scheduling.VolumeCount{},
211+
MarkedForDeletion: !node.DeletionTimestamp.IsZero(),
212+
podRequests: map[types.NamespacedName]v1.ResourceList{},
213+
podLimits: map[types.NamespacedName]v1.ResourceList{},
192214
}
193215
if err := multierr.Combine(
194216
c.populateCapacity(ctx, node, n),
@@ -319,10 +341,15 @@ func (c *Cluster) updateNode(ctx context.Context, node *v1.Node) error {
319341
oldNode, ok := c.nodes[node.Name]
320342
// If the old node existed and its initialization status changed, we want to reconsider consolidation. This handles
321343
// a situation where we re-start with an unready node and it becomes ready later.
322-
if ok && oldNode.Node.Labels[v1alpha5.LabelNodeInitialized] != n.Node.Labels[v1alpha5.LabelNodeInitialized] {
323-
c.recordConsolidationChange()
344+
if ok {
345+
if oldNode.Node.Labels[v1alpha5.LabelNodeInitialized] != n.Node.Labels[v1alpha5.LabelNodeInitialized] {
346+
c.recordConsolidationChange()
347+
}
348+
// We mark the node for deletion either:
349+
// 1. If the DeletionTimestamp is set (the node is explicitly being deleted)
350+
// 2. If the last state of the node has the node MarkedForDeletion
351+
n.MarkedForDeletion = n.MarkedForDeletion || oldNode.MarkedForDeletion
324352
}
325-
326353
c.nodes[node.Name] = n
327354

328355
if node.DeletionTimestamp != nil {

‎pkg/controllers/state/suite_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,28 @@ var _ = Describe("Node Resource Level", func() {
508508
ExpectNodeResourceRequest(node, v1.ResourceCPU, "2.5")
509509
ExpectNodeResourceRequest(node, v1.ResourceMemory, "2Gi")
510510
})
511+
It("should mark node for deletion when node is deleted", func() {
512+
node := test.Node(test.NodeOptions{
513+
ObjectMeta: metav1.ObjectMeta{
514+
Labels: map[string]string{
515+
v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
516+
v1.LabelInstanceTypeStable: cloudProvider.InstanceTypes[0].Name(),
517+
},
518+
Finalizers: []string{v1alpha5.TerminationFinalizer},
519+
},
520+
Allocatable: map[v1.ResourceName]resource.Quantity{
521+
v1.ResourceCPU: resource.MustParse("4"),
522+
}},
523+
)
524+
ExpectApplied(ctx, env.Client, node)
525+
526+
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
527+
Expect(env.Client.Delete(ctx, node)).To(Succeed())
528+
529+
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
530+
ExpectNodeExists(ctx, env.Client, node.Name)
531+
ExpectNodeDeletionMarked(node)
532+
})
511533
})
512534

513535
var _ = Describe("Pod Anti-Affinity", func() {
@@ -722,3 +744,13 @@ func ExpectNodeDaemonSetRequested(node *v1.Node, resourceName v1.ResourceName, a
722744
return false
723745
})
724746
}
747+
748+
func ExpectNodeDeletionMarked(node *v1.Node) {
749+
cluster.ForEachNode(func(n *state.Node) bool {
750+
if n.Node.Name != node.Name {
751+
return true
752+
}
753+
Expect(n.MarkedForDeletion).To(BeTrue())
754+
return false
755+
})
756+
}

‎pkg/utils/node/node.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
Licensed under the Apache License, Version 2.0 (the "License");
3+
you may not use this file except in compliance with the License.
4+
You may obtain a copy of the License at
5+
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
*/
14+
15+
package node
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
v1 "k8s.io/api/core/v1"
22+
"sigs.k8s.io/controller-runtime/pkg/client"
23+
24+
"github.com/aws/karpenter/pkg/utils/pod"
25+
)
26+
27+
// GetNodePods gets the list of schedulable pods from a variadic list of nodes
28+
func GetNodePods(ctx context.Context, kubeClient client.Client, nodes ...*v1.Node) ([]*v1.Pod, error) {
29+
var pods []*v1.Pod
30+
for _, node := range nodes {
31+
var podList v1.PodList
32+
if err := kubeClient.List(ctx, &podList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
33+
return nil, fmt.Errorf("listing pods, %w", err)
34+
}
35+
for i := range podList.Items {
36+
// these pods don't need to be rescheduled
37+
if pod.IsOwnedByNode(&podList.Items[i]) ||
38+
pod.IsOwnedByDaemonSet(&podList.Items[i]) ||
39+
pod.IsTerminal(&podList.Items[i]) {
40+
continue
41+
}
42+
pods = append(pods, &podList.Items[i])
43+
}
44+
}
45+
return pods, nil
46+
}

‎test/pkg/environment/expectations.go

+7
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,13 @@ func (env *Environment) ExpectCreatedNodeCount(comparator string, nodeCount int)
218218
fmt.Sprintf("expected %d created nodes, had %d", nodeCount, env.Monitor.CreatedNodes()))
219219
}
220220

221+
func (env *Environment) EventuallyExpectCreatedNodeCount(comparator string, nodeCount int) {
222+
Eventually(func(g Gomega) {
223+
g.Expect(env.Monitor.CreatedNodes()).To(BeNumerically(comparator, nodeCount),
224+
fmt.Sprintf("expected %d created nodes, had %d", nodeCount, env.Monitor.CreatedNodes()))
225+
}).Should(Succeed())
226+
}
227+
221228
func (env *Environment) GetNode(nodeName string) v1.Node {
222229
var node v1.Node
223230
Expect(env.Client.Get(env.Context, types.NamespacedName{Name: nodeName}, &node)).To(Succeed())
+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Licensed under the Apache License, Version 2.0 (the "License");
3+
you may not use this file except in compliance with the License.
4+
You may obtain a copy of the License at
5+
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
*/
14+
15+
package integration_test
16+
17+
import (
18+
. "github.com/onsi/ginkgo/v2"
19+
. "github.com/onsi/gomega"
20+
v1 "k8s.io/api/core/v1"
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22+
"k8s.io/apimachinery/pkg/labels"
23+
"k8s.io/apimachinery/pkg/types"
24+
"k8s.io/apimachinery/pkg/util/intstr"
25+
"knative.dev/pkg/ptr"
26+
27+
"github.com/aws/karpenter/pkg/apis/awsnodetemplate/v1alpha1"
28+
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
29+
awsv1alpha1 "github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
30+
"github.com/aws/karpenter/pkg/test"
31+
)
32+
33+
var _ = Describe("Expiration", func() {
34+
It("should expire the node after the TTLSecondsUntilExpired is reached", func() {
35+
provider := test.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{AWS: awsv1alpha1.AWS{
36+
SecurityGroupSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
37+
SubnetSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
38+
}})
39+
provisioner := test.Provisioner(test.ProvisionerOptions{
40+
ProviderRef: &v1alpha5.ProviderRef{Name: provider.Name},
41+
TTLSecondsUntilExpired: ptr.Int64(30),
42+
})
43+
var numPods int32 = 3
44+
45+
dep := test.Deployment(test.DeploymentOptions{
46+
Replicas: numPods,
47+
PodOptions: test.PodOptions{
48+
ObjectMeta: metav1.ObjectMeta{
49+
Labels: map[string]string{"app": "large-app"},
50+
},
51+
},
52+
})
53+
54+
env.ExpectCreatedNodeCount("==", 0)
55+
env.ExpectCreated(provisioner, provider, dep)
56+
57+
// We don't care if the pod goes healthy, just if the node is expired
58+
env.EventuallyExpectCreatedNodeCount("==", 1)
59+
node := env.Monitor.GetCreatedNodes()[0]
60+
61+
// Eventually expect the node to be gone
62+
env.EventuallyExpectNotFound(&node)
63+
})
64+
It("should replace expired node with a single node and schedule all pods", func() {
65+
provider := test.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{AWS: awsv1alpha1.AWS{
66+
SecurityGroupSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
67+
SubnetSelector: map[string]string{"karpenter.sh/discovery": env.ClusterName},
68+
}})
69+
provisioner := test.Provisioner(test.ProvisionerOptions{
70+
ProviderRef: &v1alpha5.ProviderRef{Name: provider.Name},
71+
})
72+
var numPods int32 = 5
73+
74+
// We should setup a PDB that will only allow a minimum of 1 pod to be pending at a time
75+
minAvailable := intstr.FromInt(int(numPods) - 1)
76+
pdb := test.PodDisruptionBudget(test.PDBOptions{
77+
Labels: map[string]string{
78+
"app": "large-app",
79+
},
80+
MinAvailable: &minAvailable,
81+
})
82+
dep := test.Deployment(test.DeploymentOptions{
83+
Replicas: numPods,
84+
PodOptions: test.PodOptions{
85+
ObjectMeta: metav1.ObjectMeta{
86+
Labels: map[string]string{"app": "large-app"},
87+
},
88+
},
89+
})
90+
91+
selector := labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)
92+
env.ExpectCreatedNodeCount("==", 0)
93+
env.ExpectCreated(provisioner, provider, pdb, dep)
94+
95+
env.EventuallyExpectHealthyPodCount(selector, int(numPods))
96+
env.ExpectCreatedNodeCount("==", 1)
97+
98+
node := env.Monitor.GetCreatedNodes()[0]
99+
100+
// Reset the monitor so that we can expect a single node to be spun up after expiration
101+
env.Monitor.Reset()
102+
103+
// Set the TTLSecondsUntilExpired to get the node deleted
104+
provisioner.Spec.TTLSecondsUntilExpired = ptr.Int64(60)
105+
env.ExpectUpdate(provisioner)
106+
107+
// Eventually the node deletion timestamp will be set
108+
Eventually(func(g Gomega) {
109+
n := &v1.Node{}
110+
g.Expect(env.Client.Get(env.Context, types.NamespacedName{Name: node.Name}, n)).Should(Succeed())
111+
g.Expect(n.DeletionTimestamp.IsZero()).Should(BeFalse())
112+
}).Should(Succeed())
113+
114+
// Remove the TTLSecondsUntilExpired to make sure new node isn't deleted
115+
provisioner.Spec.TTLSecondsUntilExpired = nil
116+
env.ExpectUpdate(provisioner)
117+
118+
// After the deletion timestamp is set and all pods are drained
119+
// the node should be gone
120+
env.EventuallyExpectNotFound(&node)
121+
122+
env.EventuallyExpectHealthyPodCount(selector, int(numPods))
123+
env.ExpectCreatedNodeCount("==", 1)
124+
})
125+
})

0 commit comments

Comments
 (0)
Please sign in to comment.