Skip to content

Commit 958b404

Browse files
authoredAug 31, 2022
fix: reduce pod nomination event counts (#2384)
Reduces API server load during large scale-ups.
1 parent dff4312 commit 958b404

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed
 

‎pkg/controllers/controllers.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,10 @@ func Initialize(injectCloudProvider func(context.Context, cloudprovider.Options)
137137
}
138138

139139
realClock := clock.RealClock{}
140-
recorder := events.NewDedupeRecorder(events.NewRecorder(manager.GetEventRecorderFor(appName)))
140+
recorder := events.NewRecorder(manager.GetEventRecorderFor(appName))
141+
recorder = events.NewLoadSheddingRecorder(recorder)
142+
recorder = events.NewDedupeRecorder(recorder)
143+
141144
cluster := state.NewCluster(realClock, cfg, manager.GetClient(), cloudProvider)
142145
provisioner := provisioning.NewProvisioner(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster)
143146
consolidation.NewController(ctx, realClock, manager.GetClient(), provisioner, cloudProvider, recorder, cluster, manager.Elected())

‎pkg/events/loadshedding.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Licensed under the Apache License, Version 2.0 (the "License");
3+
you may not use this file except in compliance with the License.
4+
You may obtain a copy of the License at
5+
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
8+
Unless required by applicable law or agreed to in writing, software
9+
distributed under the License is distributed on an "AS IS" BASIS,
10+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
See the License for the specific language governing permissions and
12+
limitations under the License.
13+
*/
14+
15+
package events
16+
17+
import (
18+
v1 "k8s.io/api/core/v1"
19+
"k8s.io/client-go/util/flowcontrol"
20+
)
21+
22+
func NewLoadSheddingRecorder(r Recorder) Recorder {
23+
return &loadshedding{
24+
rec: r,
25+
nominationBucket: flowcontrol.NewTokenBucketRateLimiter(5, 10),
26+
}
27+
}
28+
29+
type loadshedding struct {
30+
rec Recorder
31+
nominationBucket flowcontrol.RateLimiter
32+
}
33+
34+
func (l *loadshedding) NominatePod(pod *v1.Pod, node *v1.Node) {
35+
// Pod nominations occur very often, especially in large scale-ups. They normally aren't particularly useful
36+
// during a scaleup, but are useful when at a steady state where we have a bug and think a pod will schedule
37+
// that actually won't. This prevents us from hammering the API server with events that likely aren't useful
38+
// which can slow down node creation or result in events being dropped anyway by the K8s client.
39+
if !l.nominationBucket.TryAccept() {
40+
return
41+
}
42+
l.rec.NominatePod(pod, node)
43+
}
44+
45+
func (l *loadshedding) EvictPod(pod *v1.Pod) {
46+
l.rec.EvictPod(pod)
47+
}
48+
49+
func (l *loadshedding) PodFailedToSchedule(pod *v1.Pod, err error) {
50+
l.rec.PodFailedToSchedule(pod, err)
51+
}
52+
53+
func (l *loadshedding) NodeFailedToDrain(node *v1.Node, err error) {
54+
l.rec.NodeFailedToDrain(node, err)
55+
}
56+
57+
func (l *loadshedding) TerminatingNodeForConsolidation(node *v1.Node, reason string) {
58+
l.rec.TerminatingNodeForConsolidation(node, reason)
59+
}
60+
61+
func (l *loadshedding) LaunchingNodeForConsolidation(node *v1.Node, reason string) {
62+
l.rec.LaunchingNodeForConsolidation(node, reason)
63+
}
64+
65+
func (l *loadshedding) WaitingOnReadinessForConsolidation(node *v1.Node) {
66+
l.rec.WaitingOnReadinessForConsolidation(node)
67+
}
68+
69+
func (l *loadshedding) WaitingOnDeletionForConsolidation(node *v1.Node) {
70+
l.rec.WaitingOnDeletionForConsolidation(node)
71+
}

0 commit comments

Comments
 (0)
Please sign in to comment.