Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Daemon pods keep running after workflow DAG fails when failFast enabled. Fixes: #10313 #12871

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 41 additions & 15 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ var (
// ErrDeadlineExceeded indicates the operation exceeded its deadline for execution
ErrDeadlineExceeded = errors.New(errors.CodeTimeout, "Deadline exceeded")
// ErrParallelismReached indicates this workflow reached its parallelism limit
ErrParallelismReached = errors.New(errors.CodeForbidden, "Max parallelism reached")
ErrParallelismReached = errors.New(errors.CodeForbidden, "Max parallelism reached")
// ErrFailFastReached indicates that the workflow failed fast
ErrFailFastReached = errors.New(errors.CodeForbidden, "Fail fast reached")
ErrResourceRateLimitReached = errors.New(errors.CodeForbidden, "resource creation rate-limit reached")
// ErrTimeout indicates a specific template timed out
ErrTimeout = errors.New(errors.CodeTimeout, "timeout")
Expand Down Expand Up @@ -1957,6 +1959,11 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
return node, err
}

// Check if we reach template or workflow FailFast and immediately return if we did
if err := woc.checkFailFast(processedTmpl, node, opts.boundaryID); err != nil {
return node, err
}

unlockedNode := false

if processedTmpl.Synchronization != nil {
Expand Down Expand Up @@ -2748,14 +2755,8 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
return ErrParallelismReached
}

// If we are a DAG or Steps template, check if we have active pods or unsuccessful children
// If we are a DAG or Steps template, check if we have active pods
if node != nil && (tmpl.GetType() == wfv1.TemplateTypeDAG || tmpl.GetType() == wfv1.TemplateTypeSteps) {
// Check failFast
if tmpl.IsFailFast() && woc.getUnsuccessfulChildren(node.ID) > 0 {
woc.markNodePhase(node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
return ErrParallelismReached
}

// Check parallelism
if tmpl.HasParallelism() && woc.getActivePods(node.ID) >= *tmpl.Parallelism {
woc.log.Infof("template (node %s) active children parallelism exceeded %d", node.ID, *tmpl.Parallelism)
Expand All @@ -2764,6 +2765,37 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
}

// if we are about to execute a pod, make sure our parent hasn't reached it's limit
if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) {
boundaryTemplate, templateStored, err := woc.GetTemplateByBoundaryID(boundaryID)
if err != nil {
return err
}
// A new template was stored during resolution, persist it
if templateStored {
woc.updated = true
}

// Check parallelism
if boundaryTemplate.HasParallelism() && woc.getActiveChildren(boundaryID) >= *boundaryTemplate.Parallelism {
woc.log.Infof("template (node %s) active children parallelism exceeded %d", boundaryID, *boundaryTemplate.Parallelism)
return ErrParallelismReached
}
}
return nil
}

// checkFailFast checks if the given template is failFast enabled, considering whether the workflow/template have unsuccessful children or not.
func (woc *wfOperationCtx) checkFailFast(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error {
// If we are a DAG or Steps template, check if we have unsuccessful children
if node != nil && (tmpl.GetType() == wfv1.TemplateTypeDAG || tmpl.GetType() == wfv1.TemplateTypeSteps) {
// Check failFast
if tmpl.IsFailFast() && woc.getUnsuccessfulChildren(node.ID) > 0 {
woc.markNodePhase(node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
return ErrFailFastReached
}
}

// if we are about to execute a pod, make sure our parent has failFast enabled and unsuccessful children
if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) {
boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID)
if err != nil {
Expand All @@ -2782,13 +2814,7 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
// Check failFast
if boundaryTemplate.IsFailFast() && woc.getUnsuccessfulChildren(boundaryID) > 0 {
woc.markNodePhase(boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
return ErrParallelismReached
}

// Check parallelism
if boundaryTemplate.HasParallelism() && woc.getActiveChildren(boundaryID) >= *boundaryTemplate.Parallelism {
woc.log.Infof("template (node %s) active children parallelism exceeded %d", boundaryID, *boundaryTemplate.Parallelism)
return ErrParallelismReached
return ErrFailFastReached
}
}
return nil
Expand Down
197 changes: 0 additions & 197 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7830,203 +7830,6 @@ func TestHasOutputResultRef(t *testing.T) {
assert.True(t, hasOutputResultRef("generate-random-1", &wf.Spec.Templates[0]))
}

const stepsFailFast = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
creationTimestamp: "2021-03-12T15:28:29Z"
name: seq-loop-pz4hh
spec:
activeDeadlineSeconds: 300
arguments:
parameters:
- name: items
value: |
["a", "b", "c"]
entrypoint: seq-loop
templates:
- failFast: true
inputs:
parameters:
- name: items
name: seq-loop
parallelism: 1
steps:
- - name: iteration
template: iteration
withParam: '{{inputs.parameters.items}}'
- name: iteration
steps:
- - name: step1
template: succeed-step
- - name: step2
template: failed-step
- container:
args:
- exit 0
command:
- /bin/sh
- -c
image: alpine
name: succeed-step
- container:
args:
- exit 1
command:
- /bin/sh
- -c
image: alpine
name: failed-step
retryStrategy:
limit: 1
ttlStrategy:
secondsAfterCompletion: 600
status:
nodes:
seq-loop-pz4hh:
children:
- seq-loop-pz4hh-3652003332
displayName: seq-loop-pz4hh
id: seq-loop-pz4hh
inputs:
parameters:
- name: items
value: |
["a", "b", "c"]
name: seq-loop-pz4hh
outboundNodes:
- seq-loop-pz4hh-4172612902
phase: Running
startedAt: "2021-03-12T15:28:29Z"
templateName: seq-loop
templateScope: local/seq-loop-pz4hh
type: Steps
seq-loop-pz4hh-347271843:
boundaryID: seq-loop-pz4hh-1269516111
displayName: step2(0)
finishedAt: "2021-03-12T15:28:39Z"
hostNodeName: k3d-k3s-default-server-0
id: seq-loop-pz4hh-347271843
message: Error (exit code 1)
name: seq-loop-pz4hh[0].iteration(0:a)[1].step2(0)
phase: Failed
startedAt: "2021-03-12T15:28:33Z"
templateName: failed-step
templateScope: local/seq-loop-pz4hh
type: Pod
seq-loop-pz4hh-1269516111:
boundaryID: seq-loop-pz4hh
children:
- seq-loop-pz4hh-3596771579
displayName: iteration(0:a)
id: seq-loop-pz4hh-1269516111
name: seq-loop-pz4hh[0].iteration(0:a)
outboundNodes:
- seq-loop-pz4hh-4172612902
phase: Running
startedAt: "2021-03-12T15:28:29Z"
templateName: iteration
templateScope: local/seq-loop-pz4hh
type: Steps
seq-loop-pz4hh-1287186880:
boundaryID: seq-loop-pz4hh-1269516111
children:
- seq-loop-pz4hh-347271843
- seq-loop-pz4hh-4172612902
displayName: step2
id: seq-loop-pz4hh-1287186880
name: seq-loop-pz4hh[0].iteration(0:a)[1].step2
phase: Failed
startedAt: "2021-03-12T15:28:33Z"
templateName: failed-step
templateScope: local/seq-loop-pz4hh
type: Retry
seq-loop-pz4hh-3596771579:
boundaryID: seq-loop-pz4hh-1269516111
children:
- seq-loop-pz4hh-4031713604
displayName: '[0]'
finishedAt: "2021-03-12T15:28:33Z"
id: seq-loop-pz4hh-3596771579
name: seq-loop-pz4hh[0].iteration(0:a)[0]
phase: Succeeded
startedAt: "2021-03-12T15:28:29Z"
templateScope: local/seq-loop-pz4hh
type: StepGroup
seq-loop-pz4hh-3652003332:
boundaryID: seq-loop-pz4hh
children:
- seq-loop-pz4hh-1269516111
displayName: '[0]'
id: seq-loop-pz4hh-3652003332
name: seq-loop-pz4hh[0]
phase: Running
startedAt: "2021-03-12T15:28:29Z"
templateScope: local/seq-loop-pz4hh
type: StepGroup
seq-loop-pz4hh-3664029150:
boundaryID: seq-loop-pz4hh-1269516111
children:
- seq-loop-pz4hh-1287186880
displayName: '[1]'
id: seq-loop-pz4hh-3664029150
name: seq-loop-pz4hh[0].iteration(0:a)[1]
phase: Running
startedAt: "2021-03-12T15:28:33Z"
templateScope: local/seq-loop-pz4hh
type: StepGroup
seq-loop-pz4hh-4031713604:
boundaryID: seq-loop-pz4hh-1269516111
children:
- seq-loop-pz4hh-3664029150
displayName: step1
finishedAt: "2021-03-12T15:28:32Z"
hostNodeName: k3d-k3s-default-server-0
id: seq-loop-pz4hh-4031713604
name: seq-loop-pz4hh[0].iteration(0:a)[0].step1
phase: Succeeded
startedAt: "2021-03-12T15:28:29Z"
templateName: succeed-step
templateScope: local/seq-loop-pz4hh
type: Pod
seq-loop-pz4hh-4172612902:
boundaryID: seq-loop-pz4hh-1269516111
displayName: step2(1)
finishedAt: "2021-03-12T15:28:47Z"
hostNodeName: k3d-k3s-default-server-0
id: seq-loop-pz4hh-4172612902
message: Error (exit code 1)
name: seq-loop-pz4hh[0].iteration(0:a)[1].step2(1)
phase: Failed
startedAt: "2021-03-12T15:28:41Z"
templateName: failed-step
templateScope: local/seq-loop-pz4hh
type: Pod
phase: Running
startedAt: "2021-03-12T15:28:29Z"

`

func TestStepsFailFast(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(stepsFailFast)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowFailed, woc.wf.Status.Phase)
node := woc.wf.Status.Nodes.FindByDisplayName("iteration(0:a)")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
}
node = woc.wf.Status.Nodes.FindByDisplayName("seq-loop-pz4hh")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
}
}

func TestGetStepOrDAGTaskName(t *testing.T) {
assert.Equal(t, "generate-artifact", getStepOrDAGTaskName("data-transformation-gjrt8[0].generate-artifact(2:foo/script.py)"))
assert.Equal(t, "generate-artifact", getStepOrDAGTaskName("data-transformation-gjrt8[0].generate-artifact(2:foo/scrip[t.py)"))
Expand Down