Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure node phases propagate correctly. Fixes #12869 #12991

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 28 additions & 7 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type dagContext struct {
// tasks are all the tasks in the template
tasks []wfv1.DAGTask

// convert to get the task
nodeToTask map[string]*wfv1.DAGTask

// visited keeps track of tasks we have already visited during an invocation of executeDAG
// in order to avoid duplicating work
visited map[string]bool
Expand Down Expand Up @@ -143,7 +146,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
// will take precedence as the branch phase for their descendents.
targetTaskPhases := make(map[string]wfv1.NodePhase)
for _, task := range targetTasks {
targetTaskPhases[d.taskNodeID(task)] = ""
targetTaskPhases[d.taskNodeID(task)] = wfv1.NodeOmitted
}

boundaryNode, err := nodes.Get(d.boundaryID)
Expand All @@ -168,13 +171,19 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
return wfv1.NodeRunning, nil
}

task := d.nodeToTask[curr.nodeId]

// Only overwrite the branchPhase if this node completed. (If it didn't we can just inherit our parent's branchPhase).
if node.Completed() {
branchPhase = node.Phase

if !curr.phase.FailedOrError() || (task != nil && !task.ContinuesOn(curr.phase)) {
branchPhase = node.Phase
}
}

previousPhase, isTargetTask := targetTaskPhases[node.ID]
// This node is a target task, so it will not have any children. Store or deduce its phase
if previousPhase, isTargetTask := targetTaskPhases[node.ID]; isTargetTask {
if isTargetTask {
// Since we want Failed or Errored phases to have preference over Succeeded in case of ambiguity, only update
// the deduced phase of the target task if it is not already Failed or Errored.
// Note that if the target task is NOT omitted (i.e. it Completed), then this check is moot, because every time
Expand All @@ -193,11 +202,15 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh

// We only succeed if all the target tasks have been considered (i.e. its nodes created) and there are no failures
failFast := d.tmpl.DAG.FailFast == nil || *d.tmpl.DAG.FailFast
result := wfv1.NodeSucceeded
result := boundaryNode.Phase
if result == wfv1.NodeRunning {
result = wfv1.NodeSucceeded
}

for _, depName := range targetTasks {
branchPhase := targetTaskPhases[d.taskNodeID(depName)]
if branchPhase == "" {
result = wfv1.NodeRunning
if branchPhase == wfv1.NodeOmitted {
result = wfv1.NodeSucceeded
// If failFast is disabled, we will want to let all tasks complete before checking for failures
if !failFast {
break
Expand All @@ -211,7 +224,6 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
if task := d.GetTask(depName); task.ContinuesOn(branchPhase) {
continue
}

result = branchPhase
// If failFast is enabled, don't check to see if other target tasks are complete and fail now instead
if failFast {
Expand Down Expand Up @@ -246,6 +258,7 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
boundaryID: node.ID,
tasks: tmpl.DAG.Tasks,
visited: make(map[string]bool),
nodeToTask: make(map[string]*wfv1.DAGTask),
tmpl: tmpl,
wf: woc.wf,
tmplCtx: tmplCtx,
Expand All @@ -263,6 +276,12 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
targetTasks = strings.Split(tmpl.DAG.Target, " ")
}

for _, task := range tmpl.DAG.Tasks {
task := task
taskId := dagCtx.taskNodeID(task.Name)
dagCtx.nodeToTask[taskId] = &task
}

// kick off execution of each target task asynchronously
onExitCompleted := true
for _, taskName := range targetTasks {
Expand Down Expand Up @@ -350,6 +369,8 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
}
outputs, err := getTemplateOutputsFromScope(tmpl, scope)
if err != nil {
_ = woc.markNodePhase(nodeName, wfv1.NodeError)
err = woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName)
woc.log.Errorf("unable to get outputs")
return node, err
}
Expand Down