-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New engine: parent task tracking #12915
Conversation
✅ Deploy Preview for prefect-docs-preview ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
src/prefect/new_flow_engine.py
Outdated
task_engine = TaskRunEngine(task=dummy_task, parameters=self.parameters) | ||
return await task_engine.create_task_run(client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #13153 I added a create_run
method to Task
. What do you think of that as a centralized place for creating task runs rather than instantiating an engine to create a task run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely, that feels clean to me. And avoids the engine object feeling overloaded too. The reason I consolidated here was just to ensure that the "dummy" subflow task was being created with exactly the same logic we used for all other tasks, so this sounds very in line. Happy to do that here or support the change in #13153, whatever order of operations plays out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick update: I moved this logic to use the new method but it doesn't include the parent key and other logic still on the task run engine, so some things failed. Rather than mirror all of that into Task.create_run()
in this PR, I opted to leave as is and if/when you use migrate TaskRunEngine.create_task_run
over to the new method, this will go with it smoothly.
task_inputs["wait_for"] = [TaskRunResult(id=task_run_ctx.task_run.id)] | ||
# there is no flow run | ||
if not flow_run_ctx: | ||
task_inputs["__parents__"] = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed the __parents__
key is a list. Is there a scenario where a task could have multiple parents?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, @cicdw and I debated exactly this. We really wanted to say no but here is what we learned:
- the Server API already types this as a list, so the point is unfortunately moot and it's going to have to be a list even if just for compatibility reasons
- it turns out... yes there is a situation! In New engine: add support for sync and async generator tasks #13138 I experimented with extending the engine to support generator tasks. The jury is still out on whether we want to support that (it's cool, but creates some question marks for how things like task.submit work) but if a task received inputs from two different running generators, then one of the rules we identified for that PR is that the task would actually consider both generators to be its parents! So again one example does not a rule make, but I think it's not out of the question that we'll find situations where a task has multiple parent tasks as well as multiple upstream dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there are some merge conflicts, but overall this LGTM!
This PR improves tracking of nested tasks. Currently the parent is tracked under the
wait_for
key of the task's dependency map, but wait_for is a different behavior than the parent. This changes the key to__parent__
to indicate it is a special kind of dependency and also avoid conflicts with any true task kwargs.