Skip to content

Commit

Permalink
add metadata to kubelet eviction event annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed May 23, 2018
1 parent 8f4674d commit fd1f19f
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 32 deletions.
5 changes: 5 additions & 0 deletions pkg/controller/testutil/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ func (f *FakeRecorder) Eventf(obj runtime.Object, eventtype, reason, messageFmt
func (f *FakeRecorder) PastEventf(obj runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}

// AnnotatedEventf emits a fake formatted event to the fake recorder
func (f *FakeRecorder) AnnotatedEventf(obj runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
f.Eventf(obj, eventtype, reason, messageFmt, args)
}

func (f *FakeRecorder) generateEvent(obj runtime.Object, timestamp metav1.Time, eventtype, reason, message string) {
f.Lock()
defer f.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/kubelet/container/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ func (irecorder *innerEventRecorder) PastEventf(object runtime.Object, timestamp
}
}

func (irecorder *innerEventRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
if ref, ok := irecorder.shouldRecordEvent(object); ok {
irecorder.recorder.AnnotatedEventf(ref, annotations, eventtype, reason, messageFmt, args...)
}

}

// Pod must not be nil.
func IsHostNetworkPod(pod *v1.Pod) bool {
return pod.Spec.HostNetwork
Expand Down
13 changes: 7 additions & 6 deletions pkg/kubelet/eviction/eviction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
if !isHardEvictionThreshold(thresholdToReclaim) {
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
}
if m.evictPod(pod, gracePeriodOverride, evictionMessage(resourceToReclaim, pod, statsFunc)) {
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
if m.evictPod(pod, gracePeriodOverride, message, annotations) {
return []*v1.Pod{pod}
}
}
Expand Down Expand Up @@ -534,7 +535,7 @@ func (m *managerImpl) emptyDirLimitEviction(podStats statsapi.PodStats, pod *v1.
used := podVolumeUsed[pod.Spec.Volumes[i].Name]
if used != nil && size != nil && size.Sign() == 1 && used.Cmp(*size) > 0 {
// the emptyDir usage exceeds the size limit, evict the pod
return m.evictPod(pod, 0, fmt.Sprintf(emptyDirMessage, pod.Spec.Volumes[i].Name, size.String()))
return m.evictPod(pod, 0, fmt.Sprintf(emptyDirMessage, pod.Spec.Volumes[i].Name, size.String()), nil)
}
}
}
Expand Down Expand Up @@ -566,7 +567,7 @@ func (m *managerImpl) podEphemeralStorageLimitEviction(podStats statsapi.PodStat
podEphemeralStorageLimit := podLimits[v1.ResourceEphemeralStorage]
if podEphemeralStorageTotalUsage.Cmp(podEphemeralStorageLimit) > 0 {
// the total usage of pod exceeds the total size limit of containers, evict the pod
return m.evictPod(pod, 0, fmt.Sprintf(podEphemeralStorageMessage, podEphemeralStorageLimit.String()))
return m.evictPod(pod, 0, fmt.Sprintf(podEphemeralStorageMessage, podEphemeralStorageLimit.String()), nil)
}
return false
}
Expand All @@ -588,15 +589,15 @@ func (m *managerImpl) containerEphemeralStorageLimitEviction(podStats statsapi.P

if ephemeralStorageThreshold, ok := thresholdsMap[containerStat.Name]; ok {
if ephemeralStorageThreshold.Cmp(*containerUsed) < 0 {
return m.evictPod(pod, 0, fmt.Sprintf(containerEphemeralStorageMessage, containerStat.Name, ephemeralStorageThreshold.String()))
return m.evictPod(pod, 0, fmt.Sprintf(containerEphemeralStorageMessage, containerStat.Name, ephemeralStorageThreshold.String()), nil)

}
}
}
return false
}

func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg string) bool {
func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg string, annotations map[string]string) bool {
// If the pod is marked as critical and static, and support for critical pod annotations is enabled,
// do not evict such pods. Static pods are not re-admitted after evictions.
// https://github.com/kubernetes/kubernetes/issues/40573 has more details.
Expand All @@ -611,7 +612,7 @@ func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg
Reason: Reason,
}
// record that we are evicting the pod
m.recorder.Eventf(pod, v1.EventTypeWarning, Reason, evictMsg)
m.recorder.AnnotatedEventf(pod, annotations, v1.EventTypeWarning, Reason, evictMsg)
// this is a blocking call and should only return when the pod and its containers are killed.
err := m.killPodFunc(pod, status, &gracePeriodOverride)
if err != nil {
Expand Down
24 changes: 19 additions & 5 deletions pkg/kubelet/eviction/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ const (
// this prevents constantly updating the memcg notifier if synchronize
// is run frequently.
notifierRefreshInterval = 10 * time.Second
// OffendingContainersKey is the key in eviction event annotations for the list of container names which exceeded their requests
OffendingContainersKey = "offending_containers"
// OffendingContainersUsageKey is the key in eviction event annotations for the list of usage of containers which exceeded their requests
OffendingContainersUsageKey = "offending_containers_usage"
// StarvedResourceKey is the key for the starved resource in eviction event annotations
StarvedResourceKey = "starved_resource"
)

var (
Expand Down Expand Up @@ -1053,12 +1059,15 @@ func buildSignalToNodeReclaimFuncs(imageGC ImageGC, containerGC ContainerGC, wit
return signalToReclaimFunc
}

// evictionMessage constructs a useful message about why an eviction occurred
func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats statsFunc) string {
message := fmt.Sprintf(message, resourceToReclaim)
// evictionMessage constructs a useful message about why an eviction occurred, and annotations to provide metadata about the eviction
func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats statsFunc) (message string, annotations map[string]string) {
annotations = make(map[string]string)
message = fmt.Sprintf(message, resourceToReclaim)
containers := []string{}
containerUsage := []string{}
podStats, ok := stats(pod)
if !ok {
return message
return
}
for _, containerStats := range podStats.Containers {
for _, container := range pod.Spec.Containers {
Expand All @@ -1077,11 +1086,16 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats
}
if usage != nil && usage.Cmp(requests) > 0 {
message += fmt.Sprintf(containerMessage, container.Name, usage.String(), requests.String())
containers = append(containers, container.Name)
containerUsage = append(containerUsage, usage.String())
}
}
}
}
return message
annotations[OffendingContainersKey] = strings.Join(containers, ",")
annotations[OffendingContainersUsageKey] = strings.Join(containerUsage, ",")
annotations[StarvedResourceKey] = string(resourceToReclaim)
return
}

// thresholdStopCh is a ThresholdStopCh which can only be closed after notifierRefreshInterval time has passed
Expand Down
22 changes: 15 additions & 7 deletions staging/src/k8s.io/client-go/tools/record/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type EventRecorder interface {

// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

// AnnotatedEventf is just like eventf, but with annotations attached
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
Expand Down Expand Up @@ -250,7 +253,7 @@ type recorderImpl struct {
clock clock.Clock
}

func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp metav1.Time, eventtype, reason, message string) {
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
Expand All @@ -262,7 +265,7 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp met
return
}

event := recorder.makeEvent(ref, eventtype, reason, message)
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source

go func() {
Expand All @@ -281,27 +284,32 @@ func validateEventType(eventtype string) bool {
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, metav1.Now(), eventtype, reason, message)
recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}

func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
recorder.generateEvent(object, nil, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
}

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, eventtype, reason, message string) *v1.Event {
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: namespace,
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: namespace,
Annotations: annotations,
},
InvolvedObject: *ref,
Reason: reason,
Expand Down
4 changes: 4 additions & 0 deletions staging/src/k8s.io/client-go/tools/record/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (f *FakeRecorder) Eventf(object runtime.Object, eventtype, reason, messageF
func (f *FakeRecorder) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
}

func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
f.Eventf(object, eventtype, reason, messageFmt, args)
}

// NewFakeRecorder creates new fake event recorder with event channel with
// buffer of given size.
func NewFakeRecorder(bufferSize int) *FakeRecorder {
Expand Down
1 change: 1 addition & 0 deletions test/e2e_node/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
Expand Down

0 comments on commit fd1f19f

Please sign in to comment.