Skip to content

Commit

Permalink
fix: ensure node phases propagate correctly
Browse files Browse the repository at this point in the history
Signed-off-by: Isitha Subasinghe <isubasinghe@student.unimelb.edu.au>
  • Loading branch information
isubasinghe committed Apr 29, 2024
1 parent f1ab5aa commit 969cc95
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type dagContext struct {
// tasks are all the tasks in the template
tasks []wfv1.DAGTask

// convert to get the task
nodeToTask map[string]*wfv1.DAGTask

// visited keeps track of tasks we have already visited during an invocation of executeDAG
// in order to avoid duplicating work
visited map[string]bool
Expand Down Expand Up @@ -143,7 +146,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
// will take precedence as the branch phase for their descendents.
targetTaskPhases := make(map[string]wfv1.NodePhase)
for _, task := range targetTasks {
targetTaskPhases[d.taskNodeID(task)] = ""
targetTaskPhases[d.taskNodeID(task)] = wfv1.NodeOmitted
}

boundaryNode, err := nodes.Get(d.boundaryID)
Expand All @@ -168,13 +171,19 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
return wfv1.NodeRunning, nil
}

task := d.nodeToTask[curr.nodeId]

// Only overwrite the branchPhase if this node completed. (If it didn't we can just inherit our parent's branchPhase).
if node.Completed() {
branchPhase = node.Phase

if !curr.phase.FailedOrError() || (task != nil && !task.ContinuesOn(curr.phase)) {
branchPhase = node.Phase
}
}

previousPhase, isTargetTask := targetTaskPhases[node.ID]
// This node is a target task, so it will not have any children. Store or deduce its phase
if previousPhase, isTargetTask := targetTaskPhases[node.ID]; isTargetTask {
if isTargetTask {
// Since we want Failed or Errored phases to have preference over Succeeded in case of ambiguity, only update
// the deduced phase of the target task if it is not already Failed or Errored.
// Note that if the target task is NOT omitted (i.e. it Completed), then this check is moot, because every time
Expand All @@ -193,11 +202,15 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh

// We only succeed if all the target tasks have been considered (i.e. its nodes created) and there are no failures
failFast := d.tmpl.DAG.FailFast == nil || *d.tmpl.DAG.FailFast
result := wfv1.NodeSucceeded
result := boundaryNode.Phase
if result == wfv1.NodeRunning {
result = wfv1.NodeSucceeded
}

for _, depName := range targetTasks {
branchPhase := targetTaskPhases[d.taskNodeID(depName)]
if branchPhase == "" {
result = wfv1.NodeRunning
branchPhase, _ := targetTaskPhases[d.taskNodeID(depName)]

Check failure on line 211 in workflow/controller/dag.go

View workflow job for this annotation

GitHub Actions / Lint

S1005: unnecessary assignment to the blank identifier (gosimple)
if branchPhase == wfv1.NodeOmitted {
result = wfv1.NodeSucceeded
// If failFast is disabled, we will want to let all tasks complete before checking for failures
if !failFast {
break
Expand All @@ -211,7 +224,6 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh
if task := d.GetTask(depName); task.ContinuesOn(branchPhase) {
continue
}

result = branchPhase
// If failFast is enabled, don't check to see if other target tasks are complete and fail now instead
if failFast {
Expand Down Expand Up @@ -246,6 +258,7 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
boundaryID: node.ID,
tasks: tmpl.DAG.Tasks,
visited: make(map[string]bool),
nodeToTask: make(map[string]*wfv1.DAGTask),
tmpl: tmpl,
wf: woc.wf,
tmplCtx: tmplCtx,
Expand All @@ -263,6 +276,11 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
targetTasks = strings.Split(tmpl.DAG.Target, " ")
}

for _, task := range tmpl.DAG.Tasks {
taskId := dagCtx.taskNodeID(task.Name)
dagCtx.nodeToTask[taskId] = &task

Check failure on line 281 in workflow/controller/dag.go

View workflow job for this annotation

GitHub Actions / Lint

exporting a pointer for the loop variable task (exportloopref)
}

// kick off execution of each target task asynchronously
onExitCompleted := true
for _, taskName := range targetTasks {
Expand Down Expand Up @@ -350,6 +368,8 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
}
outputs, err := getTemplateOutputsFromScope(tmpl, scope)
if err != nil {
_ = woc.markNodePhase(nodeName, wfv1.NodeError)
err = woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName)
woc.log.Errorf("unable to get outputs")
return node, err
}
Expand Down

0 comments on commit 969cc95

Please sign in to comment.