Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): Add Parallelism Limit to ParallelFor tasks. Fixes #8718 #10798

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

gmfrasca
Copy link
Member

@gmfrasca gmfrasca commented May 7, 2024

Description of your changes:
Fixes #8718

Adds the Parallelism item to a DAG template if specified by a task's IteratorPolicy (ie ParallelFor w/ a parallelism limit).

Checklist:

Signed-off-by: Giulio Frasca <gfrasca@redhat.com>
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign chensun for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@gmfrasca
Copy link
Member Author

gmfrasca commented May 7, 2024

looks like a CI infra issue?

/retest just to check

Copy link

@gmfrasca: The /retest command does not accept any targets.
The following commands are available to trigger required jobs:

  • /test kfp-kubernetes-test-python310
  • /test kfp-kubernetes-test-python311
  • /test kfp-kubernetes-test-python312
  • /test kfp-kubernetes-test-python38
  • /test kfp-kubernetes-test-python39
  • /test kubeflow-pipeline-backend-test
  • /test kubeflow-pipeline-frontend-test
  • /test kubeflow-pipeline-mkp-snapshot-test
  • /test kubeflow-pipeline-mkp-test
  • /test kubeflow-pipelines-backend-visualization
  • /test kubeflow-pipelines-component-yaml
  • /test kubeflow-pipelines-components-google-cloud-python38
  • /test kubeflow-pipelines-integration-v2
  • /test kubeflow-pipelines-manifests
  • /test kubeflow-pipelines-sdk-docformatter
  • /test kubeflow-pipelines-sdk-execution-tests
  • /test kubeflow-pipelines-sdk-isort
  • /test kubeflow-pipelines-sdk-python310
  • /test kubeflow-pipelines-sdk-python311
  • /test kubeflow-pipelines-sdk-python312
  • /test kubeflow-pipelines-sdk-python38
  • /test kubeflow-pipelines-sdk-python39
  • /test kubeflow-pipelines-sdk-yapf
  • /test test-kfp-runtime-code-python310
  • /test test-kfp-runtime-code-python311
  • /test test-kfp-runtime-code-python312
  • /test test-kfp-runtime-code-python38
  • /test test-kfp-runtime-code-python39
  • /test test-run-all-gcpc-modules
  • /test test-upgrade-kfp-sdk

The following commands are available to trigger optional jobs:

  • /test kfp-kubernetes-execution-tests
  • /test kubeflow-pipeline-e2e-test
  • /test kubeflow-pipeline-upgrade-test
  • /test kubeflow-pipeline-upgrade-test-v2
  • /test kubeflow-pipelines-samples-v2

Use /test all to run the following jobs that were automatically triggered:

  • kubeflow-pipeline-backend-test
  • kubeflow-pipeline-e2e-test
  • kubeflow-pipeline-upgrade-test
  • kubeflow-pipeline-upgrade-test-v2
  • kubeflow-pipelines-samples-v2

In response to this:

looks like a CI infra issue?

/retest just to check

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@gmfrasca
Copy link
Member Author

gmfrasca commented May 7, 2024

/retest

Comment on lines 75 to 83
// Add Parallelism Limit if present
parallel := int64(kfpTask.GetIteratorPolicy().GetParallelismLimit())
if parallel > 0 {
currentParallelism := dag.Parallelism
if currentParallelism == nil || parallel > *currentParallelism {
dag.Parallelism = &parallel
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmfrasca can we get a test for this here?

Something that confirms that when in the pipeline dag we specify parallelism, it gets set in the argo workflow?

@HumairAK
Copy link
Contributor

HumairAK commented May 9, 2024

tested with this pipeline:

parallelfor.py
from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def preprocess(
        message: Input[str],
        output_model: Output[Model]
):
    import random
    line = "some_model"
    print(f"Message: {message}")
    with open(output_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    output_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def train(
        model: Input[Model],
        epoch: Input[int],
        trained_model: Output[Model],
):
    import random
    line = "some_model"
    print(f"Train for epoch: {epoch}")
    with open(trained_model.path, 'w') as output_file:
        output_file.write('line: {}'.format(line))
    trained_model.metadata['accuracy'] = random.uniform(0, 1)


@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)


if __name__ == '__main__':
    compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')

Worked successfully:

~ $ kubectl -n ${kfp_ns} get workflow tutorial-data-passing-drsfz -o yaml | yq '.spec.templates[-2]'
dag:
  tasks:
    - arguments:
        parameters:
          - name: component
            value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-for-loop-2}}'
          - name: parent-dag-id
            value: '{{inputs.parameters.parent-dag-id}}'
          - name: task
            value: '{"componentRef":{"name":"comp-for-loop-2"},"dependentTasks":["preprocess"],"inputs":{"artifacts":{"pipelinechannel--preprocess-output_model":{"taskOutputArtifact":{"outputArtifactKey":"output_model","producerTask":"preprocess"}}}},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[1, 5, 10, 25]"}},"taskInfo":{"name":"for-loop-2"}}'
      depends: preprocess.Succeeded
      name: for-loop-2-driver
      template: system-dag-driver
    - arguments:
        parameters:
          - name: parent-dag-id
            value: '{{tasks.for-loop-2-driver.outputs.parameters.execution-id}}'
          - name: iteration-index
            value: '{{item}}'
      depends: for-loop-2-driver.Succeeded
      name: for-loop-2-iterations
      template: comp-for-loop-2-for-loop-2
      withSequence:
        count: '{{tasks.for-loop-2-driver.outputs.parameters.iteration-count}}'
    - arguments:
        parameters:
          - name: component
            value: '{{workflow.annotations.pipelines.kubeflow.org/components-comp-preprocess}}'
          - name: task
            value: '{"cachingOptions":{},"componentRef":{"name":"comp-preprocess"},"inputs":{"parameters":{"message":{"runtimeValue":{"constant":"dataset"}}}},"taskInfo":{"name":"preprocess"}}'
          - name: container
            value: '{{workflow.annotations.pipelines.kubeflow.org/implementations-comp-preprocess}}'
          - name: parent-dag-id
            value: '{{inputs.parameters.parent-dag-id}}'
      name: preprocess-driver
      template: system-container-driver
    - arguments:
        parameters:
          - name: pod-spec-patch
            value: '{{tasks.preprocess-driver.outputs.parameters.pod-spec-patch}}'
          - default: "false"
            name: cached-decision
            value: '{{tasks.preprocess-driver.outputs.parameters.cached-decision}}'
      depends: preprocess-driver.Succeeded
      name: preprocess
      template: system-container-executor
inputs:
  parameters:
    - name: parent-dag-id
metadata:
  annotations:
    sidecar.istio.io/inject: "false"
name: root
outputs: {}
parallelism: 2

Note the:

parallelism: 2

The UI feedback on this could be better:

image

Currently all 4 iterations show executing at the same time (they also never really show the done checkmark even once they finish). But this problem existed prior to this, and seems out of scope for this task, and should be in a follow up issue.

However I confirmed the pods are scheduled 2 at a time in this example (since parallelism = 2 in this example).

@HumairAK
Copy link
Contributor

HumairAK commented May 9, 2024

Hrmm testing it with the above pipeline amended to:

@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline():
    preprocess_task = preprocess(message="dataset").set_caching_options(enable_caching=False)
    with dsl.ParallelFor(items=[1, 5, 10, 25], parallelism=2) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

    with dsl.ParallelFor(items=[6, 12, 24, 48], parallelism=4) as epochs:
        train(model=preprocess_task.outputs['output_model'], epoch=epochs).set_caching_options(enable_caching=False)

It looks like the workflow will use parallelism = 4 for both

@gmfrasca
Copy link
Member Author

gmfrasca commented May 9, 2024

as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.

The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.

@@ -72,6 +72,15 @@ func (c *workflowCompiler) DAG(name string, componentSpec *pipelinespec.Componen
if err != nil {
return err
}
// Add Parallelism Limit if present
parallel := int64(kfpTask.GetIteratorPolicy().GetParallelismLimit())
if parallel > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nitpick: You could change the variable name to something like parallelismLimit so that it represents a limit on parallelism rather than a count of parallel processes.

if currentParallelism == nil || parallel > *currentParallelism {
dag.Parallelism = &parallel
}
}
Copy link
Contributor

@DharmitD DharmitD May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add an else statement here to set a default parallelism limit if none or an invalid one is specified. Something like:

else {
    defaultParallelism := int64(1)  // Assuming '1' as a safe default
    dag.Parallelism = &defaultParallelism
    log.Infof("Set default parallelism limit of %d due to invalid input", defaultParallelism)
}

@hsteude
Copy link
Contributor

hsteude commented May 10, 2024

as @HumairAK noted it looks like there's a problem when there are multiple ParallelFor components in a single DAG, as the parallelism mechanism applies to the entire DAG, and as it stands there's no way to assign various parallelismLimits to individual tasks within a DAG template.

The obvious solution/workaround for this is to add another layer of abstraction on iterator tasks, (ie, root DAG calls a new "XYZ-iterator" DAG, which contains the withSequence iterator and parallelism limit value), but this will make the DAG a bit more complex than it already is, and I'm not sure at this moment what the consequences of updating the workflow path to this may be. will need a bit to investigate.

Hi @gmfrasca, I think it's really important to have a limit that applies to the individual for loop, not the entire DAG. Have you considered using Argo's implementation of loop parallelism? Using that might even simplify the DAG. However, implementing this could lead to issues with other backends.

@gmfrasca
Copy link
Member Author

Hey @hsteude - so this implementation actually already leverages the Argo loop parallelism mechanism. The issue here is that the current compiled architecture of a pipeline aggregates all KFP pipeline steps into sequential tasks of a top-level root DAG Template, but the finest granularity you can specify that limit is at the Template level, not an individual DAGTask. Essentially, we do not have the concept of parallelism on a per-step basis to use in this current state.

The workaround/DAG re-architecture I mentioned above would bump out each of these steps to call their own intermediate Template, each time with its own DAG and iterator, and this template would simply call the component Template itself. With that, we could then specify individual parallelism limits for individual steps, since they are now encapsulated in a Template, at the cost of introducing another layer of abstraction/templating

…mpiler

Signed-off-by: Giulio Frasca <gfrasca@redhat.com>
…ted tasks

Signed-off-by: Giulio Frasca <gfrasca@redhat.com>
@google-oss-prow google-oss-prow bot added size/XL and removed size/XS labels May 13, 2024
@gmfrasca
Copy link
Member Author

/test kubeflow-pipelines-samples-v2

Copy link

@gmfrasca: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
kubeflow-pipeline-upgrade-test 9780650 link false /test kubeflow-pipeline-upgrade-test
kubeflow-pipeline-e2e-test 9780650 link false /test kubeflow-pipeline-e2e-test
kubeflow-pipelines-samples-v2 9780650 link false /test kubeflow-pipelines-samples-v2

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat(backend): Support loop parallelism
4 participants