-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(backend): Add Parallelism Limit to ParallelFor tasks. Fixes #8718 #10798
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Giulio Frasca <gfrasca@redhat.com>
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
looks like a CI infra issue? /retest just to check |
@gmfrasca: The
The following commands are available to trigger optional jobs:
Use
In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
/retest |
// Add Parallelism Limit if present | ||
parallel := int64(kfpTask.GetIteratorPolicy().GetParallelismLimit()) | ||
if parallel > 0 { | ||
currentParallelism := dag.Parallelism | ||
if currentParallelism == nil || parallel > *currentParallelism { | ||
dag.Parallelism = ¶llel | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tested with this pipeline: parallelfor.pyfrom kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
message: Input[str],
output_model: Output[Model]
):
import random
line = "some_model"
print(f"Message: {message}")
with open(output_model.path, 'w') as output_file:
output_file.write('line: {}'.format(line))
output_model.metadata['accuracy'] = random.uniform(0, 1)
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
model: Input[Model],
epoch: Input[int],
trained_model: Output[Model],
):
import random
line = "some_model"
print(f"Train for epoch: {epoch}")
with open(trained_model.path, 'w') as output_file:
output_file.write('line: {}'.format(line))
trained_model.metadata['accuracy'] = random.uniform(0, 1)
@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)
if __name__ == '__main__':
compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')
Worked successfully: ~ $ kubectl -n ${kfp_ns} get workflow tutorial-data-passing-drsfz -o yaml | yq '.spec.templates[-2]' dag:
tasks:
- arguments:
parameters:
- name: component
value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
- name: task
value: '{"componentRef":{"name":"comp-for-loop-2"},"dependentTasks":["preprocess"],"inputs":{"artifacts":{"pipelinechannel--preprocess-output_model":{"taskOutputArtifact":{"outputArtifactKey":"output_model","producerTask":"preprocess"}}}},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[1, 5, 10, 25]"}},"taskInfo":{"name":"for-loop-2"}}'
depends: preprocess.Succeeded
name: for-loop-2-driver
template: system-dag-driver
- arguments:
parameters:
- name: parent-dag-id
value: '{{tasks.for-loop-2-driver.outputs.parameters.execution-id}}'
- name: iteration-index
value: '{{item}}'
depends: for-loop-2-driver.Succeeded
name: for-loop-2-iterations
template: comp-for-loop-2-for-loop-2
withSequence:
count: '{{tasks.for-loop-2-driver.outputs.parameters.iteration-count}}'
- arguments:
parameters:
- name: component
value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-preprocess}}'
- name: task
value: '{"cachingOptions":{},"componentRef":{"name":"comp-preprocess"},"inputs":{"parameters":{"message":{"runtimeValue":{"constant":"dataset"}}}},"taskInfo":{"name":"preprocess"}}'
- name: container
value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-preprocess}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
name: preprocess-driver
template: system-container-driver
- arguments:
parameters:
- name: pod-spec-patch
value: '{{tasks.preprocess-driver.outputs.parameters.pod-spec-patch}}'
- default: "false"
name: cached-decision
value: '{{tasks.preprocess-driver.outputs.parameters.cached-decision}}'
depends: preprocess-driver.Succeeded
name: preprocess
template: system-container-executor
inputs:
parameters:
- name: parent-dag-id
metadata:
annotations:
sidecar.istio.io/inject: "false"
name: root
outputs: {}
parallelism: 2
Note the:
The UI feedback on this could be better: Currently all 4 iterations show executing at the same time (they also never really show the done checkmark even once they finish). But this problem existed prior to this, and seems out of scope for this task, and should be in a follow up issue. However I confirmed the pods are scheduled 2 at a time in this example (since parallelism = 2 in this example). |
Hrmm testing it with the above pipeline amended to: @dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)
with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False) It looks like the workflow will use |
as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the |
@@ -72,6 +72,15 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen | |||
if err != nil { | |||
return err | |||
} | |||
// Add Parallelism Limit if present | |||
parallel := int64(kfpTask.GetIteratorPolicy().GetParallelismLimit()) | |||
if parallel > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nitpick: You could change the variable name to something like parallelismLimit
so that it represents a limit on parallelism rather than a count of parallel processes.
if currentParallelism == nil || parallel > *currentParallelism { | ||
dag.Parallelism = ¶llel | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could add an else statement here to set a default parallelism limit if none or an invalid one is specified. Something like:
else {
defaultParallelism := int64(1) // Assuming '1' as a safe default
dag.Parallelism = &defaultParallelism
log.Infof("Set default parallelism limit of %d due to invalid input", defaultParallelism)
}
Hi @gmfrasca, I think it's really important to have a limit that applies to the individual for loop, not the entire DAG. Have you considered using Argo's implementation of loop parallelism? Using that might even simplify the DAG. However, implementing this could lead to issues with other backends. |
Hey @hsteude - so this implementation actually already leverages the Argo loop parallelism mechanism. The issue here is that the current compiled architecture of a pipeline aggregates all KFP pipeline steps into sequential The workaround/DAG re-architecture I mentioned above would bump out each of these |
…mpiler Signed-off-by: Giulio Frasca <gfrasca@redhat.com>
…ted tasks Signed-off-by: Giulio Frasca <gfrasca@redhat.com>
/test kubeflow-pipelines-samples-v2 |
@gmfrasca: The following tests failed, say
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Description of your changes:
Fixes #8718
Adds the
Parallelism
item to a DAG template if specified by a task's IteratorPolicy (ie ParallelFor w/ a parallelism limit).Checklist: