Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #123809: fix stateful set pod recreation and event spam #124564

Open
wants to merge 4 commits into
base: release-1.29
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/controller/statefulset/stateful_set.go
Expand Up @@ -98,7 +98,6 @@ func NewStatefulSetController(
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
revListerSynced: revInformer.Informer().HasSynced,
Expand Down Expand Up @@ -235,6 +234,9 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa
return
}
logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
if oldPod.Status.Phase != curPod.Status.Phase {
logger.V(4).Info("StatefulSet Pod phase changed", "pod", klog.KObj(curPod), "statefulSet", klog.KObj(set), "podPhase", curPod.Status.Phase)
}
ssc.enqueueStatefulSet(set)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
Expand Down
43 changes: 10 additions & 33 deletions pkg/controller/statefulset/stateful_set_control.go
Expand Up @@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
Expand Down Expand Up @@ -62,16 +61,14 @@ type StatefulSetControlInterface interface {
func NewDefaultStatefulSetControl(
podControl *StatefulPodControl,
statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface,
recorder record.EventRecorder) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
controllerHistory history.Interface) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory}
}

type defaultStatefulSetControl struct {
podControl *StatefulPodControl
statusUpdater StatefulSetStatusUpdaterInterface
controllerHistory history.Interface
recorder record.EventRecorder
}

// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
Expand Down Expand Up @@ -368,45 +365,25 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current
func (ssc *defaultStatefulSetControl) processReplica(
ctx context.Context,
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
currentSet *apps.StatefulSet,
updateSet *apps.StatefulSet,
monotonic bool,
replicas []*v1.Pod,
i int) (bool, error) {
logger := klog.FromContext(ctx)
// Delete and recreate pods which finished running.
//

// Note that pods with phase Succeeded will also trigger this event. This is
// because final pod phase of evicted or otherwise forcibly stopped pods
// (e.g. terminated on node reboot) is determined by the exit code of the
// container, not by the reason for pod termination. We should restart the pod
// regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
} else {
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
"StatefulSet %s/%s is recreating terminated Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
}
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return true, err
if replicas[i].DeletionTimestamp == nil {
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return true, err
}
}
replicaOrd := i + getStartOrdinal(set)
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
replicaOrd)
// New pod should be generated on the next sync after the current pod is removed from etcd.
return true, nil
}
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
Expand Down Expand Up @@ -638,7 +615,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(

// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
processReplicaFn := func(i int) (bool, error) {
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i)
}
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
Expand Down