Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: optimise pod finalizers with merge patch and resourceVersion #12862

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c3ab048
fix: optimise pod finalizers by using merge patch, and add a finalize…
Mar 30, 2024
2cb1444
fix: fix mistake of getting capture timeout env
Apr 4, 2024
aa8515e
Merge remote-tracking branch 'github/main' into mymain
Apr 10, 2024
4851db0
fix: optimise pod finalizers by using merge patch and resourceVersion
Apr 10, 2024
4d60dcf
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 10, 2024
dd0602d
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 10, 2024
58b4d7a
Merge remote-tracking branch 'github/main' into mymain
Apr 11, 2024
99a79ac
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 11, 2024
a51f618
Merge branch 'main' into master
imliuda Apr 12, 2024
2fde3a9
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 12, 2024
fa4775c
Merge branch 'main' into master
imliuda Apr 12, 2024
5c10335
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 12, 2024
7ed924e
Merge remote-tracking branch 'github/main' into mymain
Apr 14, 2024
f3467af
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 14, 2024
7776b6a
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 14, 2024
f6bedbe
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 14, 2024
8af5e02
fix: optimise pod finalizers by using merge patch and resourceVersion…
Apr 14, 2024
842b1c3
Update workflow/controller/controller_test.go
imliuda Apr 17, 2024
f3d2785
Update workflow/controller/controller_test.go
imliuda Apr 17, 2024
7fbb562
Merge branch 'main' into master
imliuda Apr 17, 2024
0a2f983
Update workflow/controller/controller_test.go
imliuda Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 63 additions & 64 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ import (
"k8s.io/client-go/dynamic"
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
apiwatch "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-workflows/v3"
Expand All @@ -52,6 +50,7 @@ import (
"github.com/argoproj/argo-workflows/v3/util/diff"
"github.com/argoproj/argo-workflows/v3/util/env"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
"github.com/argoproj/argo-workflows/v3/util/slice"
"github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories"
"github.com/argoproj/argo-workflows/v3/workflow/common"
controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
Expand Down Expand Up @@ -153,12 +152,6 @@ type WorkflowController struct {
recentCompletions recentCompletions
}

type PatchOperation struct {
Operation string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}

const (
workflowResyncPeriod = 20 * time.Minute
workflowTemplateResyncPeriod = 20 * time.Minute
Expand Down Expand Up @@ -522,6 +515,58 @@ func (wfc *WorkflowController) runPodCleanup(ctx context.Context) {
}
}

func (wfc *WorkflowController) getPodCleanupPatch(pod *apiv1.Pod, labelPodCompleted bool) ([]byte, error) {
un := unstructured.Unstructured{}
if labelPodCompleted {
un.SetLabels(map[string]string{common.LabelKeyCompleted: "true"})
}

finalizerEnabled := os.Getenv("ARGO_POD_STATUS_CAPTURE_FINALIZER") == "true"
imliuda marked this conversation as resolved.
Show resolved Hide resolved
if finalizerEnabled && slice.ContainsString(pod.Finalizers, common.FinalizerPodStatus) {
un.SetFinalizers(slice.RemoveString(pod.Finalizers, common.FinalizerPodStatus))
imliuda marked this conversation as resolved.
Show resolved Hide resolved
un.SetResourceVersion(pod.ObjectMeta.ResourceVersion)
}

if len(un.Object) == 0 {
imliuda marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

return un.MarshalJSON()
}

func (wfc *WorkflowController) patchPodForCleanup(ctx context.Context, namespace, podName string, labelPodCompleted bool) error {
pod, err := wfc.getPod(namespace, podName)
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
// err is always nil in all kind of caches for now
if err != nil {
return err
}
// if pod is nil, it must have been deleted
if pod == nil {
return nil
}

patch, err := wfc.getPodCleanupPatch(pod, labelPodCompleted)
if err != nil {
return err
}
if patch == nil {
return nil
}

_, err = wfc.kubeclientset.CoreV1().Pods(namespace).Patch(
ctx,
podName,
types.MergePatchType,
patch,
metav1.PatchOptions{},
)
if err != nil && !apierr.IsNotFound(err) {
return err
}

return nil
}

// all pods will ultimately be cleaned up by either deleting them, or labelling them
func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bool {
key, quit := wfc.podCleanupQueue.Get()
Expand All @@ -538,9 +583,10 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
logCtx := log.WithFields(log.Fields{"key": key, "action": action})
logCtx.Info("cleaning up pod")
err := func() error {
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
switch action {
case terminateContainers:
pod, err := wfc.getPodFromCache(namespace, podName)
pod, err := wfc.getPod(namespace, podName)
if err == nil && pod != nil && pod.Status.Phase == apiv1.PodPending {
wfc.queuePodForCleanup(namespace, podName, deletePod)
} else if terminationGracePeriod, err := wfc.signalContainers(ctx, namespace, podName, syscall.SIGTERM); err != nil {
Expand All @@ -553,15 +599,14 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
return err
}
case labelPodCompleted:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, true); err != nil {
if err := wfc.patchPodForCleanup(ctx, namespace, podName, true); err != nil {
return err
}
case deletePod:
pods := wfc.kubeclientset.CoreV1().Pods(namespace)
if err := wfc.enablePodForDeletion(ctx, pods, namespace, podName, false); err != nil {
if err := wfc.patchPodForCleanup(ctx, namespace, podName, false); err != nil {
return err
}

propagation := metav1.DeletePropagationBackground
err := pods.Delete(ctx, podName, metav1.DeleteOptions{
PropagationPolicy: &propagation,
Expand All @@ -575,22 +620,14 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
}()
if err != nil {
logCtx.WithError(err).Warn("failed to clean-up pod")
if errorsutil.IsTransientErr(err) {
if errorsutil.IsTransientErr(err) || apierr.IsConflict(err) {
wfc.podCleanupQueue.AddRateLimited(key)
}
}
return true
}

func (wfc *WorkflowController) getPodFromAPI(ctx context.Context, namespace string, podName string) (*apiv1.Pod, error) {
pod, err := wfc.kubeclientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, err
}
return pod, nil
}

func (wfc *WorkflowController) getPodFromCache(namespace string, podName string) (*apiv1.Pod, error) {
func (wfc *WorkflowController) getPod(namespace string, podName string) (*apiv1.Pod, error) {
obj, exists, err := wfc.podInformer.GetStore().GetByKey(namespace + "/" + podName)
if err != nil {
return nil, err
Expand All @@ -605,46 +642,8 @@ func (wfc *WorkflowController) getPodFromCache(namespace string, podName string)
return pod, nil
}

func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods typedv1.PodInterface, namespace string, podName string, isCompleted bool) error {
// Get current Pod from K8S and update it to remove finalizer, and if the Pod was completed, set the Label
// In the case that the Pod changed in between Get and Update, we'll get a conflict error and can try again
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
currentPod, err := wfc.getPodFromAPI(ctx, namespace, podName)
if err != nil {
return err
}
updatedPod := currentPod.DeepCopy()

if isCompleted {
if updatedPod.Labels == nil {
updatedPod.Labels = make(map[string]string)
}
updatedPod.Labels[common.LabelKeyCompleted] = "true"
}

updatedPod.Finalizers = removeFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus)

_, err = pods.Update(ctx, updatedPod, metav1.UpdateOptions{})
return err
})
if err != nil {
return err
}
return nil
}

func removeFinalizer(finalizers []string, targetFinalizer string) []string {
var updatedFinalizers []string
for _, finalizer := range finalizers {
if finalizer != targetFinalizer {
updatedFinalizers = append(updatedFinalizers, finalizer)
}
}
return updatedFinalizers
}

func (wfc *WorkflowController) signalContainers(ctx context.Context, namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
pod, err := wfc.getPodFromCache(namespace, podName)
pod, err := wfc.getPod(namespace, podName)
if pod == nil || err != nil {
return 0, err
}
Expand Down Expand Up @@ -1054,8 +1053,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
log.WithError(err).Error("Failed to list pods")
}
for _, p := range podList.Items {
if err := wfc.enablePodForDeletion(ctx, pods, p.Namespace, p.Name, false); err != nil {
log.WithError(err).Error("Failed to enable pod for deletion")
if slice.ContainsString(p.Finalizers, common.FinalizerPodStatus) {
wfc.queuePodForCleanup(p.Namespace, p.Name, deletePod)
Copy link
Member

@agilgur5 agilgur5 Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this is a change from the previous behavior to only queue instead of patching... but the List already incurred an API call...

this logic seems to stem from #12413 (comment)

I suppose this is fine so long as they are eventually deleted and the queue is more stable than multiple immediate network requests for every delete

I also see that this is the case that a Workflow finalizer would prevent from happening

}
}

Expand Down
52 changes: 52 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controller

import (
"context"
"encoding/json"
"os"
imliuda marked this conversation as resolved.
Show resolved Hide resolved
"testing"
"time"

Expand Down Expand Up @@ -1256,3 +1258,53 @@ func TestWorkflowWithLongArguments(t *testing.T) {
assert.True(t, controller.processNextPodCleanupItem(ctx))
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

func TestGetPodCleaupCache(t *testing.T) {
imliuda marked this conversation as resolved.
Show resolved Hide resolved
wfc := &WorkflowController{}

pod := &apiv1.Pod{}
pod.SetLabels(map[string]string{common.LabelKeyCompleted: "false"})
pod.SetFinalizers([]string{common.FinalizerPodStatus})
pod.SetResourceVersion("123456")
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

// pod finalizer enabled, patch label
os.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "true")
expected := &map[string]interface{}{}
err := json.Unmarshal([]byte(`{"metadata":{"resourceVersion":"123456","finalizers":[],"labels":{"workflows.argoproj.io/completed":"true"}}}`), expected)
assert.Nil(t, err)
imliuda marked this conversation as resolved.
Show resolved Hide resolved
patch, err := wfc.getPodCleanupPatch(pod, true)
assert.Nil(t, err)
actual := &map[string]interface{}{}
err = json.Unmarshal(patch, actual)
assert.Nil(t, err)
assert.Equal(t, expected, actual)
assert.Len(t, pod.Finalizers, 1)

// pod finalizer enabled, not patch label
expected = &map[string]interface{}{}
err = json.Unmarshal([]byte(`{"metadata":{"resourceVersion":"123456","finalizers":[]}}`), expected)
assert.Nil(t, err)
imliuda marked this conversation as resolved.
Show resolved Hide resolved
patch, err = wfc.getPodCleanupPatch(pod, false)
assert.Nil(t, err)
actual = &map[string]interface{}{}
err = json.Unmarshal(patch, actual)
assert.Nil(t, err)
assert.Equal(t, expected, actual)

// pod finalizer disabled, patch both
os.Setenv("ARGO_POD_STATUS_CAPTURE_FINALIZER", "false")
expected = &map[string]interface{}{}
err = json.Unmarshal([]byte(`{"metadata":{"labels":{"workflows.argoproj.io/completed":"true"}}}`), expected)
assert.Nil(t, err)
imliuda marked this conversation as resolved.
Show resolved Hide resolved
patch, err = wfc.getPodCleanupPatch(pod, true)
assert.Nil(t, err)
actual = &map[string]interface{}{}
err = json.Unmarshal(patch, actual)
assert.Nil(t, err)
assert.Equal(t, expected, actual)

// pod finalizer disabled, not patch label
patch, err = wfc.getPodCleanupPatch(pod, false)
assert.Nil(t, err)
assert.Nil(t, patch)
}
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2629,7 +2629,7 @@ func (woc *wfOperationCtx) getPodByNode(node *wfv1.NodeStatus) (*apiv1.Pod, erro
}

podName := woc.getPodName(node.Name, node.TemplateName)
return woc.controller.getPodFromCache(woc.wf.GetNamespace(), podName)
return woc.controller.getPod(woc.wf.GetNamespace(), podName)
}

func (woc *wfOperationCtx) recordNodePhaseEvent(node *wfv1.NodeStatus) {
Expand Down