You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm working on a reference ploomber pipeline for my team on and off and have encountered an issue using serializer/unserializer (which I think should be the default practice to support IoC in pipelines) and mixed notebook/script, python source tasks.
pipeline.yaml as follows:
meta:
extract_upstream: false # this is a workaround for another oddity of how upstreams in notebooks vs source tasks workserializer: common.serialization.default_serializer # just a mapping of the default ploomber serializers + HDFSunserializer: common.serialization.default_deserializer # just a mapping of the default ploomber deserializers + HDFStasks:
- source: sql/load-components.sql # simple SQLDump taskname: load-componentsclient: common.clients.get_mysidewalk_client # gets a connection to a specific databasechunksize: None # chunksize is poorly supported on the downstream so I am skipping it for nowproduct: output/load-components.parquet
- source: scripts/resolve-columns.ipynb # notebook task, uses jupysql which was poorly supported in the .py notebook(?)static_analysis: disableupstream: [load-components]product:
data: output/resolve-columns.parquetnb: reports/resolve-columns.html # required product, since I already execute from the ipynb and would likely want to commit the .py version from jupytext for PRs and review, I didn't want to make another .ipynb version# The following 2 tasks replace the notebook task above with the exact same input and output but using python source tasks and sqlalchemy instead of ipynb tasks and jupysql
- source: tasks.columns.unpackname: unpack-columnsupstream: [load-components]product: output/unpack-columns.parquet
- source: tasks.columns.resolvename: resolve-columns-2upstream: [unpack-columns]product: output/resolve-columns-2.parquet# end replacement tasks
- source: tasks.polygon_features.unpackname: unpack-polygon-featuresupstream: [load-components]product: output/unpack-polygon-features.parquet
- source: tasks.polygon_features.resolvename: resolve-polygon-featuresupstream: [unpack-polygon-features]product: output/resolve-polygon-features.parquet# this task will fail if resolve-columns is upstream but succeed with resolve-columns-2 upstream
- source: tasks.join.combine_dataname: combine-dataupstream: [load-components, resolve-columns, resolve-polygon-features]product: output/combine-data.parquet
right now, combine-data is just:
defcombine_data(upstream):
components=upstream["load-components"]
columns=upstream["resolve-columns"]["data"] # works fine as upstream["resolve-columns-2"]features=upstream["resolve-polygon-features"]
returnpandas.DataFrame()
It executes with no issue when resolve-columns-2 is the upstream. It produces the following error when resolve-columns is the upstream:
Loading pipeline...
Building task 'combine-data': 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:03<00:00, 3.02s/it]
================================================================================================================================================== DAG build failed ===================================================================================================================================================
------------------------------------------------------------------------------------------------------------------------ PythonCallable: combine-data -> File('output/combine-data.parquet') -------------------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------------------------------------------------------ ~/development/repos/gpt-experiments/callout_preamble/tasks/join.py:4 ------------------------------------------------------------------------------------------------------------------------
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/abc.py", line 554, in _build
res = self._run()
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/abc.py", line 663, in _run
self.run()
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/tasks.py", line 235, in run
params = _unserialize_params(self.params, self._unserializer)
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/tasks.py", line 47, in _unserialize_params
params["upstream"] = {
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/tasks.py", line 48, in <dictcomp>
k: _unserializer(v, unserializer) for k, v in params["upstream"].items()
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/tasks.py", line 38, in _unserializer
return unserializer(product)
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/util/dotted_path.py", line 90, in __call__
out = self._callable(*args, **kwargs_final)
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/io/unserialize.py", line 135, in wrapper
return {
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/io/unserialize.py", line 136, in <dictcomp>
key: _unserialize_product(value, extension_mapping_final,
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/io/unserialize.py", line 191, in _unserialize_product
return unserializer(product)
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/io/unserialize.py", line 164, in _unserialize
obj = fn(f)
_pickle.UnpicklingError: invalid load key, '<'.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "~/mambaforge/envs/gpt-experiments/lib/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "~/mambaforge/envs/gpt-experiments/lib/site-packages/ploomber/tasks/abc.py", line 573, in _build
raise TaskBuildError(msg) from e
ploomber.exceptions.TaskBuildError: Error building task "combine-data"
"""
ploomber.exceptions.TaskBuildError: Error building task "combine-data"
================================================================================================================================================== Summary (1 task) ===================================================================================================================================================
PythonCallable: combine-data -> File('output//combine-data.parquet')
================================================================================================================================================== DAG build failed ===================================================================================================================================================
Need help? https://ploomber.io/community
I'm fairly certain the error related to trying to use pickle to unserialize the 'nb' sub-product of the resolve-columns upstream (unpickling and HTML aren't friends). I can work around it by not using notebooks but this would really limit adoption in my org if notebooks were never an option or notebooks couldn't be used with python callables and IoC features.
The text was updated successfully, but these errors were encountered:
Suggestion: create a special top-level key for the notebook (outside of the product) or skip the 'nb' sub-product in deserialization.
Better suggestion: lazy load the products of any upstream; this way upstreams with multiple products that will be partially consumed by downstream tasks don't produce unnecessary deserialization and memory overhead and won't deserialize "periphal" products like 'nb'.
hi @marr75, sorry I missed this! Looks like you found a workaround and things are working for you right?
I agree that the lazy loading is the best option but it'd involve more work. Ignoring the notebook product is a simpler alternative. I'll discuss with the team and see if we can allocate some resources to this. But if you have time, feel free to submit a PR.
Since you don't care about the output notebook, you could switch to the ScriptRunner (scroll down to see a sample pipeline.yaml), this task executes a notebook/script but doesn't generate an output notebook (however, it doesn't support magics)
I noticed the following in your pipeline.yaml:
# notebook task, uses jupysql which was poorly supported in the .py notebook(?)
Can you open another issue and provide more details? My guess is that the problem is with jupytext and not with ploomber itself. jupysql powers some of our internal pipelines and I remember encountering some issues because jupytext failed to do a roundtrip conversion of scripts with %%sql magics. we fixed by telling jupytext to ignore the %%sql magic in another project so maybe the same solution could be applied here
Related to #1088
I'm working on a reference ploomber pipeline for my team on and off and have encountered an issue using serializer/unserializer (which I think should be the default practice to support IoC in pipelines) and mixed notebook/script, python source tasks.
pipeline.yaml as follows:
right now, combine-data is just:
It executes with no issue when resolve-columns-2 is the upstream. It produces the following error when resolve-columns is the upstream:
I'm fairly certain the error related to trying to use pickle to unserialize the 'nb' sub-product of the resolve-columns upstream (unpickling and HTML aren't friends). I can work around it by not using notebooks but this would really limit adoption in my org if notebooks were never an option or notebooks couldn't be used with python callables and IoC features.
The text was updated successfully, but these errors were encountered: