Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: optimise pod finalizers with merge patch and resourceVersion #12862

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c3ab048
fix: optimise pod finalizers by using merge patch, and add a finalize…
Mar 30, 2024
2cb1444
fix: fix mistake of getting capture timeout env
Apr 4, 2024
aa8515e
Merge remote-tracking branch 'github/main' into mymain
Apr 10, 2024
4851db0
fix: optimise pod finalizers by using merge patch and resourceVersion
Apr 10, 2024
4d60dcf
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 10, 2024
dd0602d
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 10, 2024
58b4d7a
Merge remote-tracking branch 'github/main' into mymain
Apr 11, 2024
99a79ac
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 11, 2024
a51f618
Merge branch 'main' into master
imliuda Apr 12, 2024
2fde3a9
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 12, 2024
fa4775c
Merge branch 'main' into master
imliuda Apr 12, 2024
5c10335
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 12, 2024
7ed924e
Merge remote-tracking branch 'github/main' into mymain
Apr 14, 2024
f3467af
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 14, 2024
7776b6a
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 14, 2024
f6bedbe
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 14, 2024
8af5e02
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 14, 2024
842b1c3
Update workflow/controller/controller_test.go
imliuda Apr 17, 2024
f3d2785
Update workflow/controller/controller_test.go
imliuda Apr 17, 2024
7fbb562
Merge branch 'main' into master
imliuda Apr 17, 2024
0a2f983
Update workflow/controller/controller_test.go
imliuda Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions workflow/common/common.go
Expand Up @@ -163,6 +163,8 @@ const (
EnvVarProgressFile = "ARGO_PROGRESS_FILE"
// EnvVarDefaultRequeueTime is the default requeue time for Workflow Informers. For more info, see rate_limiters.go
EnvVarDefaultRequeueTime = "DEFAULT_REQUEUE_TIME"
// EnvVarPodStatusCaptureFinalizer is used to prevent pod garbage collected before argo captures its exit status
EnvVarPodStatusCaptureFinalizer = "ARGO_POD_STATUS_CAPTURE_FINALIZER"
// EnvAgentTaskWorkers is the number of task workers for the agent pod
EnvAgentTaskWorkers = "ARGO_AGENT_TASK_WORKERS"
// EnvAgentPatchRate is the rate that the Argo Agent will patch the Workflow TaskSet
Expand Down
125 changes: 64 additions & 61 deletions workflow/controller/controller.go
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
apiwatch "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-workflows/v3"
Expand All @@ -52,6 +51,7 @@ import (
"github.com/argoproj/argo-workflows/v3/util/diff"
"github.com/argoproj/argo-workflows/v3/util/env"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
"github.com/argoproj/argo-workflows/v3/util/slice"
"github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories"
"github.com/argoproj/argo-workflows/v3/workflow/common"
controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
Expand Down Expand Up @@ -153,12 +153,6 @@ type WorkflowController struct {
recentCompletions recentCompletions
}

type PatchOperation struct {
Operation string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}

const (
workflowResyncPeriod = 20 * time.Minute
workflowTemplateResyncPeriod = 20 * time.Minute
Expand Down Expand Up @@ -522,6 +516,56 @@ func (wfc *WorkflowController) runPodCleanup(ctx context.Context) {
}
}

func (wfc *WorkflowController) getPodCleanupPatch(pod *apiv1.Pod, labelPodCompleted bool) ([]byte, error) {
un := unstructured.Unstructured{}
if labelPodCompleted {
un.SetLabels(map[string]string{common.LabelKeyCompleted: "true"})
}

finalizerEnabled := os.Getenv(common.EnvVarPodStatusCaptureFinalizer) == "true"
if finalizerEnabled && pod.Finalizers != nil {
finalizers := slice.RemoveString(pod.Finalizers, common.FinalizerPodStatus)
if len(finalizers) != len(pod.Finalizers) {
un.SetFinalizers(finalizers)
un.SetResourceVersion(pod.ObjectMeta.ResourceVersion)
}
}

// if there was nothing to patch (no-op)
if len(un.Object) == 0 {
imliuda marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

return un.MarshalJSON()
}

func (wfc *WorkflowController) patchPodForCleanup(ctx context.Context, pods typedv1.PodInterface, namespace, podName string, labelPodCompleted bool) error {
pod, err := wfc.getPod(namespace, podName)
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
// err is always nil in all kind of caches for now
if err != nil {
return err
}
// if pod is nil, it must have been deleted
if pod == nil {
return nil
}

patch, err := wfc.getPodCleanupPatch(pod, labelPodCompleted)
if err != nil {
return err
}
if patch == nil {
return nil
}

_, err = pods.Patch(ctx, podName, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil && !apierr.IsNotFound(err) {
return err
}

return nil
}

// all pods will ultimately be cleaned up by either deleting them, or labelling them
func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bool {
key, quit := wfc.podCleanupQueue.Get()
Expand All @@ -540,7 +584,7 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
err := func() error {
switch action {
case terminateContainers:
pod, err := wfc.getPodFromCache(namespace, podName)
pod, err := wfc.getPod(namespace, podName)
if err == nil && pod != nil && pod.Status.Phase == apiv1.PodPending {
wfc.queuePodForCleanup(namespace, podName, deletePod)
} else if terminationGracePeriod, err := wfc.signalContainers(ctx, namespace, podName, syscall.SIGTERM); err != nil {
Expand All @@ -554,12 +598,12 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
}
case labelPodCompleted:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, true); err != nil {
if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, true); err != nil {
return err
}
case deletePod:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, false); err != nil {
if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, false); err != nil {
return err
}
propagation := metav1.DeletePropagationBackground
Expand All @@ -570,27 +614,24 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
if err != nil && !apierr.IsNotFound(err) {
return err
}
case removeFinalizer:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.patchPodForCleanup(ctx, pods, namespace, podName, false); err != nil {
return err
}
}
return nil
}()
if err != nil {
logCtx.WithError(err).Warn("failed to clean-up pod")
if errorsutil.IsTransientErr(err) {
if errorsutil.IsTransientErr(err) || apierr.IsConflict(err) {
wfc.podCleanupQueue.AddRateLimited(key)
}
}
return true
}

func (wfc *WorkflowController) getPodFromAPI(ctx context.Context, namespace string, podName string) (*apiv1.Pod, error) {
pod, err := wfc.kubeclientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, err
}
return pod, nil
}

func (wfc *WorkflowController) getPodFromCache(namespace string, podName string) (*apiv1.Pod, error) {
func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.Pod, error) {
obj, exists, err := wfc.podInformer.GetStore().GetByKey(namespace + "/" + podName)
if err != nil {
return nil, err
Expand All @@ -605,46 +646,8 @@ func (wfc *WorkflowController) getPodFromCache(namespace string, podName string)
return pod, nil
}

func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, isCompleted bool) error {
// Get current Pod from K8S and update it to remove finalizer, and if the Pod was completed, set the Label
// In the case that the Pod changed in between Get and Update, we'll get a conflict error and can try again
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
currentPod, err := wfc.getPodFromAPI(ctx, namespace, podName)
if err != nil {
return err
}
updatedPod := currentPod.DeepCopy()

if isCompleted {
if updatedPod.Labels == nil {
updatedPod.Labels = make(map[string]string)
}
updatedPod.Labels[common.LabelKeyCompleted] = "true"
}

updatedPod.Finalizers = removeFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus)

_, err = pods.Update(ctx, updatedPod, metav1.UpdateOptions{})
return err
})
if err != nil {
return err
}
return nil
}

func removeFinalizer(finalizers []string, targetFinalizer string) []string {
var updatedFinalizers []string
for _, finalizer := range finalizers {
if finalizer != targetFinalizer {
updatedFinalizers = append(updatedFinalizers, finalizer)
}
}
return updatedFinalizers
}

func (wfc *WorkflowController) signalContainers(ctx context.Context, namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
pod, err := wfc.getPodFromCache(namespace, podName)
pod, err := wfc.getPod(namespace, podName)
if pod == nil || err != nil {
return 0, err
}
Expand Down Expand Up @@ -1054,8 +1057,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
log.WithError(err).Error("Failed to list pods")
}
for _, p := range podList.Items {
if err := wfc.enablePodForDeletion(ctx, pods, p.Namespace, p.Name, false); err != nil {
log.WithError(err).Error("Failed to enable pod for deletion")
if slice.ContainsString(p.Finalizers, common.FinalizerPodStatus) {
wfc.queuePodForCleanup(p.Namespace, p.Name, removeFinalizer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would deletePod not make sense here? as opposed to creating a new removeFinalizer action?

Copy link
Contributor Author

@imliuda imliuda Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is caused by this case:

WaitForPod(fixtures.PodCompleted)

Workflow get deleted, so pod get deleted, too, so waiting pod completed wouldn't happen, but I have not found who deleted it and when. About the failure in Windows Tests, I am testing it in a virtual machine, but, i am struggling with network problems(see my location...). I need some help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can ignore the Windows tests, they're not required to pass as they're still unstable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would deletePod not make sense here? as opposed to creating a new removeFinalizer action?

So by far, if I should change it back to deletePod, or keep current solution?

}
}

Expand Down
49 changes: 47 additions & 2 deletions workflow/controller/controller_test.go
Expand Up @@ -2,15 +2,15 @@ package controller

import (
"context"
"os"
imliuda marked this conversation as resolved.
Show resolved Hide resolved
"testing"
"time"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/argoproj/pkg/sync"
"github.com/stretchr/testify/assert"
authorizationv1 "k8s.io/api/authorization/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -1181,6 +1181,51 @@ spec:
assert.Len(t, pods.Items, 0)
}

func TestPodCleaupPatch(t *testing.T) {
wfc := &WorkflowController{}

pod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{common.LabelKeyCompleted: "false"},
Finalizers: []string{common.FinalizerPodStatus},
ResourceVersion: "123456",
},
}

os.Setenv(common.EnvVarPodStatusCaptureFinalizer, "true")
imliuda marked this conversation as resolved.
Show resolved Hide resolved

// pod finalizer enabled, patch label
patch, err := wfc.getPodCleanupPatch(pod, true)
assert.Nil(t, err)
expected := `{"metadata":{"resourceVersion":"123456","finalizers":[],"labels":{"workflows.argoproj.io/completed":"true"}}}`
assert.JSONEq(t, expected, string(patch))

// pod finalizer enabled, do not patch label
patch, err = wfc.getPodCleanupPatch(pod, false)
assert.Nil(t, err)
expected = `{"metadata":{"resourceVersion":"123456","finalizers":[]}}`
assert.JSONEq(t, expected, string(patch))

// pod finalizer enabled, do not patch label, nil/empty finalizers
podWithNilFinalizers := &apiv1.Pod{}
patch, err = wfc.getPodCleanupPatch(podWithNilFinalizers, false)
assert.Nil(t, err)
assert.Nil(t, patch)

os.Setenv(common.EnvVarPodStatusCaptureFinalizer, "false")
imliuda marked this conversation as resolved.
Show resolved Hide resolved

// pod finalizer disabled, patch both
patch, err = wfc.getPodCleanupPatch(pod, true)
assert.Nil(t, err)
expected = `{"metadata":{"labels":{"workflows.argoproj.io/completed":"true"}}}`
assert.JSONEq(t, expected, string(patch))

// pod finalizer disabled, do not patch label
patch, err = wfc.getPodCleanupPatch(pod, false)
assert.Nil(t, err)
assert.Nil(t, patch)
}

func TestPendingPodWhenTerminate(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(helloWorldWf)
wf.Spec.Shutdown = wfv1.ShutdownStrategyTerminate
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Expand Up @@ -2629,7 +2629,7 @@ func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, erro
}

podName := woc.getPodName(node.Name, node.TemplateName)
return woc.controller.getPodFromCache(woc.wf.GetNamespace(), podName)
return woc.controller.getPod(woc.wf.GetNamespace(), podName)
}

func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/pod_cleanup_key.go
Expand Up @@ -19,6 +19,7 @@ const (
labelPodCompleted podCleanupAction = "labelPodCompleted"
terminateContainers podCleanupAction = "terminateContainers"
killContainers podCleanupAction = "killContainers"
removeFinalizer podCleanupAction = "removeFinalizer"
)

func newPodCleanupKey(namespace string, podName string, action podCleanupAction) podCleanupKey {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Expand Up @@ -188,7 +188,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
},
}

if os.Getenv("ARGO_POD_STATUS_CAPTURE_FINALIZER") == "true" {
if os.Getenv(common.EnvVarPodStatusCaptureFinalizer) == "true" {
pod.ObjectMeta.Finalizers = append(pod.ObjectMeta.Finalizers, common.FinalizerPodStatus)
}

Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/workflowpod_test.go
Expand Up @@ -1809,7 +1809,7 @@ func TestPodExists(t *testing.T) {
}

func TestPodFinalizerExits(t *testing.T) {
t.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "true")
t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "true")
cancel, controller := newController()
defer cancel()

Expand All @@ -1827,7 +1827,7 @@ func TestPodFinalizerExits(t *testing.T) {
}

func TestPodFinalizerDoesNotExist(t *testing.T) {
t.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "false")
t.Setenv(common.EnvVarPodStatusCaptureFinalizer, "false")
cancel, controller := newController()
defer cancel()

Expand Down