Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gaps for notebooks: Python source task with a notebook upstream chokes trying to deserialize 'nb' part of upstream #1097

Open
marr75 opened this issue Apr 20, 2023 · 2 comments

Comments

@marr75
Copy link
Contributor

marr75 commented Apr 20, 2023

Related to #1088

I'm working on a reference ploomber pipeline for my team on and off and have encountered an issue using serializer/unserializer (which I think should be the default practice to support IoC in pipelines) and mixed notebook/script, python source tasks.

pipeline.yaml as follows:

meta:
  extract_upstream: false # this is a workaround for another oddity of how upstreams in notebooks vs source tasks work

serializer: common.serialization.default_serializer # just a mapping of the default ploomber serializers + HDFS
unserializer: common.serialization.default_deserializer # just a mapping of the default ploomber deserializers + HDFS

tasks:
  - source: sql/load-components.sql # simple SQLDump task
    name: load-components
    client: common.clients.get_mysidewalk_client # gets a connection to a specific database
    chunksize: None # chunksize is poorly supported on the downstream so I am skipping it for now
    product: output/load-components.parquet

  - source: scripts/resolve-columns.ipynb # notebook task, uses jupysql which was poorly supported in the .py notebook(?)
    static_analysis: disable
    upstream: [load-components]
    product:
      data: output/resolve-columns.parquet
      nb: reports/resolve-columns.html # required product, since I already execute from the ipynb and would likely want to commit the .py version from jupytext for PRs and review, I didn't want to make another .ipynb version

 # The following 2 tasks replace the notebook task above with the exact same input and output but using python source tasks and sqlalchemy instead of ipynb tasks and jupysql
  - source: tasks.columns.unpack
    name: unpack-columns
    upstream: [load-components]
    product: output/unpack-columns.parquet

  - source: tasks.columns.resolve
    name: resolve-columns-2
    upstream: [unpack-columns]
    product: output/resolve-columns-2.parquet
# end replacement tasks

  - source: tasks.polygon_features.unpack
    name: unpack-polygon-features
    upstream: [load-components]
    product: output/unpack-polygon-features.parquet

  - source: tasks.polygon_features.resolve
    name: resolve-polygon-features
    upstream: [unpack-polygon-features]
    product: output/resolve-polygon-features.parquet

# this task will fail if resolve-columns is upstream but succeed with resolve-columns-2 upstream
  - source: tasks.join.combine_data
    name: combine-data
    upstream: [load-components, resolve-columns, resolve-polygon-features]
    product: output/combine-data.parquet

right now, combine-data is just:

def combine_data(upstream):
    components = upstream["load-components"]
    columns = upstream["resolve-columns"]["data"] # works fine as upstream["resolve-columns-2"]
    features = upstream["resolve-polygon-features"]
    return pandas.DataFrame()

It executes with no issue when resolve-columns-2 is the upstream. It produces the following error when resolve-columns is the upstream:

Loading pipeline...
Building task 'combine-data': 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:03<00:00,  3.02s/it]

================================================================================================================================================== DAG build failed =================================================================================================================================================== 
------------------------------------------------------------------------------------------------------------------------ PythonCallable: combine-data -> File('output/combine-data.parquet') ------------------------------------------------------------------------------------------------------------------------- 
------------------------------------------------------------------------------------------------------------------------ ~/development/repos/gpt-experiments/callout_preamble/tasks/join.py:4 ------------------------------------------------------------------------------------------------------------------------ 
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/abc.py", line 554, in _build
    res = self._run()
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/abc.py", line 663, in _run
    self.run()
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/tasks.py", line 235, in run
    params = _unserialize_params(self.params, self._unserializer)
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/tasks.py", line 47, in _unserialize_params
    params["upstream"] = {
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/tasks.py", line 48, in <dictcomp>
    k: _unserializer(v, unserializer) for k, v in params["upstream"].items()
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/tasks.py", line 38, in _unserializer
    return unserializer(product)
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/util/dotted_path.py", line 90, in __call__
    out = self._callable(*args, **kwargs_final)
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/io/unserialize.py", line 135, in wrapper
    return {
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/io/unserialize.py", line 136, in <dictcomp>
    key: _unserialize_product(value, extension_mapping_final,
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/io/unserialize.py", line 191, in _unserialize_product
    return unserializer(product)
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/io/unserialize.py", line 164, in _unserialize
    obj = fn(f)
_pickle.UnpicklingError: invalid load key, '<'.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "~/mambaforge/envs/gpt-experiments/lib/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/abc.py", line 573, in _build
    raise TaskBuildError(msg) from e
ploomber.exceptions.TaskBuildError: Error building task "combine-data"
"""

ploomber.exceptions.TaskBuildError: Error building task "combine-data"
================================================================================================================================================== Summary (1 task) =================================================================================================================================================== 
PythonCallable: combine-data -> File('output//combine-data.parquet')
================================================================================================================================================== DAG build failed =================================================================================================================================================== 

Need help? https://ploomber.io/community

I'm fairly certain the error related to trying to use pickle to unserialize the 'nb' sub-product of the resolve-columns upstream (unpickling and HTML aren't friends). I can work around it by not using notebooks but this would really limit adoption in my org if notebooks were never an option or notebooks couldn't be used with python callables and IoC features.

@marr75
Copy link
Contributor Author

marr75 commented Apr 20, 2023

Can confirm, by changing my un/de-serializer definition to:

@unserializer({".h5": _hdfs_to_df, '.html': lambda product: None}, fallback=True, defaults=defaults)
def default_deserializer(product):
    pass

The error is avoided.

Suggestion: create a special top-level key for the notebook (outside of the product) or skip the 'nb' sub-product in deserialization.

Better suggestion: lazy load the products of any upstream; this way upstreams with multiple products that will be partially consumed by downstream tasks don't produce unnecessary deserialization and memory overhead and won't deserialize "periphal" products like 'nb'.

@edublancas
Copy link
Contributor

hi @marr75, sorry I missed this! Looks like you found a workaround and things are working for you right?

I agree that the lazy loading is the best option but it'd involve more work. Ignoring the notebook product is a simpler alternative. I'll discuss with the team and see if we can allocate some resources to this. But if you have time, feel free to submit a PR.

Since you don't care about the output notebook, you could switch to the ScriptRunner (scroll down to see a sample pipeline.yaml), this task executes a notebook/script but doesn't generate an output notebook (however, it doesn't support magics)

I noticed the following in your pipeline.yaml:

# notebook task, uses jupysql which was poorly supported in the .py notebook(?)

Can you open another issue and provide more details? My guess is that the problem is with jupytext and not with ploomber itself. jupysql powers some of our internal pipelines and I remember encountering some issues because jupytext failed to do a roundtrip conversion of scripts with %%sql magics. we fixed by telling jupytext to ignore the %%sql magic in another project so maybe the same solution could be applied here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants