Skip to content

Commit da8b0e5

Browse files
authoredSep 13, 2022
feat: consolidation enhancements (#2488)
1 parent ef27e73 commit da8b0e5

File tree

7 files changed

+57
-27
lines changed

7 files changed

+57
-27
lines changed
 

‎charts/karpenter/templates/clusterrole.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ rules:
2323
resources: ["storageclasses", "csinodes"]
2424
verbs: ["get", "watch", "list"]
2525
- apiGroups: ["apps"]
26-
resources: ["daemonsets", "replicasets", "statefulsets"]
26+
resources: ["daemonsets", "deployments", "replicasets", "statefulsets"]
2727
verbs: ["list", "watch"]
2828
- apiGroups: ["admissionregistration.k8s.io"]
2929
resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"]

‎pkg/cloudprovider/aws/amifamily/ami.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (p *AMIProvider) getDefaultAMIFromSSM(ctx context.Context, _ cloudprovider.
9090
}
9191
ami := aws.StringValue(output.Parameter.Value)
9292
p.ssmCache.SetDefault(ssmQuery, ami)
93-
if p.cm.HasChanged("ssmquery", ami+ssmQuery) {
93+
if p.cm.HasChanged("ssmquery-"+ssmQuery, ami) {
9494
logging.FromContext(ctx).Debugf("Discovered %s for query %q", ami, ssmQuery)
9595
}
9696
return ami, nil

‎pkg/cloudprovider/aws/launchtemplate.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ func (p *LaunchTemplateProvider) ensureLaunchTemplate(ctx context.Context, optio
160160
} else if len(output.LaunchTemplates) != 1 {
161161
return nil, fmt.Errorf("expected to find one launch template, but found %d", len(output.LaunchTemplates))
162162
} else {
163-
logging.FromContext(ctx).Debugf("Discovered launch template %s", name)
163+
if p.cm.HasChanged("launchtemplate-"+name, name) {
164+
logging.FromContext(ctx).Debugf("Discovered launch template %s", name)
165+
}
164166
launchTemplate = output.LaunchTemplates[0]
165167
}
166168
p.cache.SetDefault(name, launchTemplate)

‎pkg/controllers/consolidation/controller.go

+33-13
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
appsv1 "k8s.io/api/apps/v1"
2929
v1 "k8s.io/api/core/v1"
3030
"k8s.io/apimachinery/pkg/api/errors"
31-
3231
"k8s.io/utils/clock"
3332
"knative.dev/pkg/logging"
3433
"knative.dev/pkg/ptr"
@@ -109,12 +108,13 @@ func (c *Controller) run(ctx context.Context) {
109108
continue
110109
}
111110

112-
// don't consolidate as we recently scaled down too soon
111+
// don't consolidate as we recently scaled up/down too soon
113112
stabilizationTime := c.clock.Now().Add(-c.stabilizationWindow(ctx))
114113
// capture the state before we process so if something changes during consolidation we'll re-look
115114
// immediately
116115
clusterState := c.cluster.ClusterConsolidationState()
117-
if c.cluster.LastNodeDeletionTime().Before(stabilizationTime) {
116+
if c.cluster.LastNodeDeletionTime().Before(stabilizationTime) &&
117+
c.cluster.LastNodeCreationTime().Before(stabilizationTime) {
118118
result, err := c.ProcessCluster(ctx)
119119
if err != nil {
120120
logging.FromContext(ctx).Errorf("consolidating cluster, %s", err)
@@ -480,10 +480,12 @@ func (c *Controller) nodeConsolidationOptionReplaceOrDelete(ctx context.Context,
480480
if node.Name == n.Node.Name && n.MarkedForDeletion {
481481
candidateNodeIsDeleting = true
482482
}
483-
if !n.MarkedForDeletion {
484-
stateNodes = append(stateNodes, n.DeepCopy())
485-
} else {
486-
markedForDeletionNodes = append(markedForDeletionNodes, n.DeepCopy())
483+
if n.Node.Name != node.Name {
484+
if !n.MarkedForDeletion {
485+
stateNodes = append(stateNodes, n.DeepCopy())
486+
} else {
487+
markedForDeletionNodes = append(markedForDeletionNodes, n.DeepCopy())
488+
}
487489
}
488490
return true
489491
})
@@ -502,7 +504,6 @@ func (c *Controller) nodeConsolidationOptionReplaceOrDelete(ctx context.Context,
502504
pods := append(node.pods, deletingNodePods...)
503505
scheduler, err := c.provisioner.NewScheduler(ctx, pods, stateNodes, scheduling.SchedulerOptions{
504506
SimulationMode: true,
505-
ExcludeNodes: []string{node.Name},
506507
})
507508

508509
if err != nil {
@@ -576,11 +577,30 @@ func (c *Controller) hasPendingPods(ctx context.Context) bool {
576577
return false
577578
}
578579

580+
func (c *Controller) deploymentsReady(ctx context.Context) bool {
581+
var depList appsv1.DeploymentList
582+
if err := c.kubeClient.List(ctx, &depList); err != nil {
583+
// failed to list, assume there must be one non-ready as it's harmless and just ensures we wait longer
584+
return false
585+
}
586+
for _, ds := range depList.Items {
587+
desired := ptr.Int32Value(ds.Spec.Replicas)
588+
if ds.Spec.Replicas == nil {
589+
// unspecified defaults to 1
590+
desired = 1
591+
}
592+
if ds.Status.ReadyReplicas < desired || ds.Status.UpdatedReplicas < desired {
593+
return false
594+
}
595+
}
596+
return true
597+
}
598+
579599
func (c *Controller) replicaSetsReady(ctx context.Context) bool {
580600
var rsList appsv1.ReplicaSetList
581601
if err := c.kubeClient.List(ctx, &rsList); err != nil {
582602
// failed to list, assume there must be one non-ready as it's harmless and just ensures we wait longer
583-
return true
603+
return false
584604
}
585605
for _, rs := range rsList.Items {
586606
desired := ptr.Int32Value(rs.Spec.Replicas)
@@ -599,7 +619,7 @@ func (c *Controller) replicationControllersReady(ctx context.Context) bool {
599619
var rsList v1.ReplicationControllerList
600620
if err := c.kubeClient.List(ctx, &rsList); err != nil {
601621
// failed to list, assume there must be one non-ready as it's harmless and just ensures we wait longer
602-
return true
622+
return false
603623
}
604624
for _, rs := range rsList.Items {
605625
desired := ptr.Int32Value(rs.Spec.Replicas)
@@ -618,15 +638,15 @@ func (c *Controller) statefulSetsReady(ctx context.Context) bool {
618638
var sslist appsv1.StatefulSetList
619639
if err := c.kubeClient.List(ctx, &sslist); err != nil {
620640
// failed to list, assume there must be one non-ready as it's harmless and just ensures we wait longer
621-
return true
641+
return false
622642
}
623643
for _, rs := range sslist.Items {
624644
desired := ptr.Int32Value(rs.Spec.Replicas)
625645
if rs.Spec.Replicas == nil {
626646
// unspecified defaults to 1
627647
desired = 1
628648
}
629-
if rs.Status.ReadyReplicas < desired {
649+
if rs.Status.ReadyReplicas < desired || rs.Status.UpdatedReplicas < desired {
630650
return false
631651
}
632652
}
@@ -635,7 +655,7 @@ func (c *Controller) statefulSetsReady(ctx context.Context) bool {
635655

636656
func (c *Controller) stabilizationWindow(ctx context.Context) time.Duration {
637657
// no pending pods, and all replica sets/replication controllers are reporting ready so quickly consider another consolidation
638-
if !c.hasPendingPods(ctx) && c.replicaSetsReady(ctx) &&
658+
if !c.hasPendingPods(ctx) && c.deploymentsReady(ctx) && c.replicaSetsReady(ctx) &&
639659
c.replicationControllersReady(ctx) && c.statefulSetsReady(ctx) {
640660
return 0
641661
}

‎pkg/controllers/provisioning/scheduling/scheduler.go

+3-11
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/samber/lo"
2323
"go.uber.org/multierr"
2424
v1 "k8s.io/api/core/v1"
25-
"k8s.io/apimachinery/pkg/util/sets"
2625
"knative.dev/pkg/logging"
2726
"sigs.k8s.io/controller-runtime/pkg/client"
2827

@@ -38,8 +37,6 @@ import (
3837
type SchedulerOptions struct {
3938
// SimulationMode if true will prevent recording of the pod nomination decisions as events
4039
SimulationMode bool
41-
// ExcludeNodes are a list of node names that are excluded from existingNodes nodes for scheduling purposes.
42-
ExcludeNodes []string
4340
}
4441

4542
func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates []*scheduling.NodeTemplate,
@@ -82,7 +79,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodeTemplates [
8279
}
8380
}
8481

85-
s.calculateExistingNodes(opts, namedNodeTemplates, stateNodes)
82+
s.calculateExistingNodes(namedNodeTemplates, stateNodes)
8683
return s
8784
}
8885

@@ -228,14 +225,9 @@ func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
228225
return errs
229226
}
230227

231-
func (s *Scheduler) calculateExistingNodes(opts SchedulerOptions, namedNodeTemplates map[string]*scheduling.NodeTemplate, stateNodes []*state.Node) {
232-
// create our in-flight nodes
233-
excluded := sets.NewString(opts.ExcludeNodes...)
228+
func (s *Scheduler) calculateExistingNodes(namedNodeTemplates map[string]*scheduling.NodeTemplate, stateNodes []*state.Node) {
229+
// create our existing nodes
234230
for _, node := range stateNodes {
235-
// skip any nodes that have been excluded
236-
if excluded.Has(node.Node.Name) {
237-
continue
238-
}
239231
name, ok := node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]
240232
if !ok {
241233
// ignoring this node as it wasn't launched by us

‎pkg/controllers/state/cluster.go

+10
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type Cluster struct {
6464
// it previously couldn't occur.
6565
consolidationState int64
6666
lastNodeDeletionTime int64
67+
lastNodeCreationTime int64
6768
}
6869

6970
func NewCluster(clk clock.Clock, cfg config.Config, client client.Client, cp cloudprovider.CloudProvider) *Cluster {
@@ -358,6 +359,10 @@ func (c *Cluster) updateNode(ctx context.Context, node *v1.Node) error {
358359
atomic.StoreInt64(&c.lastNodeDeletionTime, nodeDeletionTime)
359360
}
360361
}
362+
nodeCreationTime := node.CreationTimestamp.UnixMilli()
363+
if nodeCreationTime > atomic.LoadInt64(&c.lastNodeCreationTime) {
364+
atomic.StoreInt64(&c.lastNodeCreationTime, nodeCreationTime)
365+
}
361366
return nil
362367
}
363368

@@ -381,6 +386,11 @@ func (c *Cluster) LastNodeDeletionTime() time.Time {
381386
return time.UnixMilli(atomic.LoadInt64(&c.lastNodeDeletionTime))
382387
}
383388

389+
// LastNodeCreationTime returns the last time that at a node was created.
390+
func (c *Cluster) LastNodeCreationTime() time.Time {
391+
return time.UnixMilli(atomic.LoadInt64(&c.lastNodeCreationTime))
392+
}
393+
384394
// deletePod is called when the pod has been deleted
385395
func (c *Cluster) deletePod(podKey types.NamespacedName) {
386396
c.antiAffinityPods.Delete(podKey)

‎website/content/en/preview/faq.md

+6
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,9 @@ error: error validating "provisioner.yaml": error validating data: ValidationErr
190190
```
191191
192192
The `startupTaints` parameter was added in v0.10.0. Helm upgrades do not upgrade the CRD describing the provisioner, so it must be done manually. For specific details, see the [Upgrade Guide]({{< ref "./upgrade-guide/#upgrading-to-v0100" >}})
193+
194+
## Consolidation
195+
196+
### Why do I sometimes see an extra node get launched when updating a deployment that remains empty and is later removed?
197+
198+
Consolidation packs pods tightly onto nodes which can leave little free allocatable CPU/memory on your nodes. If a deployment uses a deployment strategy with a non-zero `maxSurge`, such as the default 25%, those surge pods may not have anywhere to run. In this case, Karpenter will launch a new node so that the surge pods can run and then remove it soon after if it's not needed.

0 commit comments

Comments
 (0)
Please sign in to comment.