Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Set default value to output parameters if suspend node timeout. Fixes #12230 #12960

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 128 additions & 0 deletions test/e2e/suspend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//go:build functional
// +build functional

package e2e

import (
"testing"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

type TestSuspendSitue struct {
fixtures.E2ESuite
}

func (s *TestSuspendSitue) TestSuspendNodeTimeoutWithoutDefaultValue() {
s.Given().Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: suspend-node-timeout-without-default-value
spec:
entrypoint: suspend
templates:
- name: suspend
steps:
- - name: approve
template: approve
- - name: release
template: whalesay
arguments:
parameters:
- name: message
value: "{{steps.approve.outputs.parameters.message}}"
- name: approve
suspend:
duration: 5s
outputs:
parameters:
- name: message
valueFrom:
supplied: {}
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Contains(t, "raw output parameter 'message' has not been set and does not have a default value", status.Message)
})
}

func (s *TestSuspendSitue) TestSuspendNodeTimeoutWithDefaultValue() {
s.Given().Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: suspend-node-timeout-with-default-value
spec:
entrypoint: suspend
templates:
- name: suspend
steps:
- - name: approve
template: approve
- - name: release
template: whalesay
arguments:
parameters:
- name: message
value: "{{steps.approve.outputs.parameters.message}}"
- name: approve
suspend:
duration: 5s
outputs:
parameters:
- name: message
valueFrom:
default: default message
supplied: {}
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.Equal(t, status.Progress, wfv1.Progress("2/2"))
}).
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
return status.Name == "suspend-node-timeout-with-default-value[0].approve"
}, func(t *testing.T, status *wfv1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, 1, len(status.Outputs.Parameters))
assert.Equal(t, "message", status.Outputs.Parameters[0].Name)
assert.Equal(t, wfv1.AnyStringPtr("default message"), status.Outputs.Parameters[0].Value)
}).
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
return status.Name == "suspend-node-timeout-with-default-value[1].release"
}, func(t *testing.T, status *wfv1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, 1, len(status.Inputs.Parameters))
assert.Equal(t, "message", status.Inputs.Parameters[0].Name)
assert.Equal(t, wfv1.AnyStringPtr("default message"), status.Inputs.Parameters[0].Value)
})
}
3 changes: 3 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3366,6 +3366,9 @@ func (woc *wfOperationCtx) executeSuspend(nodeName string, templateScope string,
if time.Now().UTC().After(suspendDeadline) {
// Suspension is expired, node can be resumed
woc.log.Infof("auto resuming node %s", nodeName)
if err := wfutil.OverrideOutputParametersWithDefault(node.Outputs); err != nil {
return node, err
}
_ = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return node, nil
}
Expand Down
30 changes: 19 additions & 11 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,23 @@ func SuspendWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, workf
return err
}

func OverrideOutputParametersWithDefault(outputs *wfv1.Outputs) error {
if outputs == nil {
return nil
}
for i, param := range outputs.Parameters {
if param.ValueFrom != nil && param.ValueFrom.Supplied != nil {
if param.ValueFrom.Default != nil {
outputs.Parameters[i].Value = param.ValueFrom.Default
outputs.Parameters[i].ValueFrom = nil
} else {
return fmt.Errorf("raw output parameter '%s' has not been set and does not have a default value", param.Name)
}
}
}
return nil
}

// ResumeWorkflow resumes a workflow by setting spec.suspend to nil and any suspended nodes to Successful.
// Retries conflict errors
func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrator hydrator.Interface, workflowName string, nodeFieldSelector string) error {
Expand Down Expand Up @@ -408,17 +425,8 @@ func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrat
// To resume a workflow with a suspended node we simply mark the node as Successful
for nodeID, node := range wf.Status.Nodes {
if node.IsActiveSuspendNode() {
if node.Outputs != nil {
for i, param := range node.Outputs.Parameters {
if param.ValueFrom != nil && param.ValueFrom.Supplied != nil {
if param.ValueFrom.Default != nil {
node.Outputs.Parameters[i].Value = param.ValueFrom.Default
node.Outputs.Parameters[i].ValueFrom = nil
} else {
return false, fmt.Errorf("raw output parameter '%s' has not been set and does not have a default value", param.Name)
}
}
}
if err := OverrideOutputParametersWithDefault(node.Outputs); err != nil {
return false, err
}
node.Phase = wfv1.NodeSucceeded
if node.Message != "" {
Expand Down