Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MRG] Don't scatter data inside dask workers #1061

Merged
merged 9 commits into from Jul 1, 2020

Conversation

pierreglaser
Copy link
Contributor

Workaround for dask/distributed#3703, which affects the joblib-dask integration.
When scattering data inside dask worker, task using this data can often end up being cancelled, see the referenced issue for more information.

This situation typically happens inside joblib during nested Parallel calls (whenn the inner parallel call scatters some data).

As suggested by @mrocklin, another workaround is to use hash=False inside client.scatter calls -- I'd like to compare the two solutions via a few benchmarks before merging this PR.

Related, but does not fix entirely #959

This also needs tests.

In order to stop being polluted by unrelated key-errors coming
from call_data_futures lookup errors
@codecov
Copy link

codecov bot commented Jun 7, 2020

Codecov Report

Merging #1061 into master will decrease coverage by 0.05%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1061      +/-   ##
==========================================
- Coverage   94.45%   94.39%   -0.06%     
==========================================
  Files          47       47              
  Lines        6889     6908      +19     
==========================================
+ Hits         6507     6521      +14     
- Misses        382      387       +5     
Impacted Files Coverage Δ
joblib/_dask.py 94.47% <100.00%> (+0.56%) ⬆️
joblib/test/test_dask.py 98.87% <100.00%> (+0.07%) ⬆️
joblib/test/testutils.py 50.00% <0.00%> (-50.00%) ⬇️
joblib/_parallel_backends.py 93.75% <0.00%> (-1.18%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9d389e0...04d8188. Read the comment docs.

Copy link
Contributor

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @pierreglaser

I think that it is probably more efficient to scatter with hash=False. Otherwise the worker will try to push the local data up to the scheduler every time, which will be inefficient. Using hash=False should avoid the naming collisions though.

joblib/_dask.py Outdated
Comment on lines 48 to 52
try:
worker = get_worker()
except ValueError:
worker = None
return worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a little bit more robust.

Suggested change
try:
worker = get_worker()
except ValueError:
worker = None
return worker
from distributed.utils import thread_state
return hasattr(thread_state, "execution_state")

@ogrisel
Copy link
Contributor

ogrisel commented Jun 9, 2020

I am also worried by sending large data chunks to the scheduler process.

Although I do not fully understand the impact of hash=True and why we get naming collisions I think it would be worth implementing this solution (using hash=False instead of hash=True).

Along side we might also want to implement an option to the joblib dask backend to choose between global nested parallelism (by submitting nested calls to the scheduler as we currently do) and a local nested variant as suggested by @mrocklin in dask/distributed#3703 (comment)

Maybe the local variant should be active by default as it's likely to be lower over-head in 99% of the cases but can potentially lead to under-subscription of the cluster in some rare cases.

@mrocklin
Copy link
Contributor

mrocklin commented Jun 9, 2020 via email

@ogrisel
Copy link
Contributor

ogrisel commented Jul 1, 2020

It worked!

@pierreglaser can you please update the changelog?

@ogrisel ogrisel changed the title [NoMRG] Don't scatter data inside dask workers [MRG] Don't scatter data inside dask workers Jul 1, 2020
@pierreglaser
Copy link
Contributor Author

Sure.

joblib/test/test_dask.py Outdated Show resolved Hide resolved
joblib/_dask.py Outdated Show resolved Hide resolved
@@ -269,15 +269,22 @@ async def maybe_to_futures(args):
try:
f = call_data_futures[arg]
except KeyError:
pass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, we won't get confusing KeyErrors if something wrong happens during scattering :)

joblib/test/test_dask.py Outdated Show resolved Hide resolved
@ogrisel ogrisel merged commit 54109d9 into joblib:master Jul 1, 2020
@ogrisel
Copy link
Contributor

ogrisel commented Jul 1, 2020

Merged! Thank you very much @pierreglaser and @mrocklin.

@mrocklin
Copy link
Contributor

mrocklin commented Jul 1, 2020

Great. I am glad to see this. I'm also now curious what performance is like now as a result of this. Are there other issues that we should resolve?

bmwiedemann added a commit to bmwiedemann/openSUSE that referenced this pull request Jul 18, 2020
https://build.opensuse.org/request/show/821624
by user dirkmueller + dimstar_suse
- update to 0.16.0
  - Fix a problem in the constructors of of Parallel backends classes that
    inherit from the `AutoBatchingMixin` that prevented the dask backend to
    properly batch short tasks.
    joblib/joblib#1062
  - Fix a problem in the way the joblib dask backend batches calls that would
    badly interact with the dask callable pickling cache and lead to wrong
    results or errors.
    joblib/joblib#1055
  - Prevent a dask.distributed bug from surfacing in joblib's dask backend
    during nested Parallel calls (due to joblib's auto-scattering feature)
    joblib/joblib#1061
  - Workaround for a race condition after Parallel calls with the dask backend
    that would cause low level warnings from asyncio
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants