New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MRG] Don't scatter data inside dask workers #1061
Conversation
In order to stop being polluted by unrelated key-errors coming from call_data_futures lookup errors
Codecov Report
@@ Coverage Diff @@
## master #1061 +/- ##
==========================================
- Coverage 94.45% 94.39% -0.06%
==========================================
Files 47 47
Lines 6889 6908 +19
==========================================
+ Hits 6507 6521 +14
- Misses 382 387 +5
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @pierreglaser
I think that it is probably more efficient to scatter with hash=False
. Otherwise the worker will try to push the local data up to the scheduler every time, which will be inefficient. Using hash=False
should avoid the naming collisions though.
joblib/_dask.py
Outdated
try: | ||
worker = get_worker() | ||
except ValueError: | ||
worker = None | ||
return worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably a little bit more robust.
try: | |
worker = get_worker() | |
except ValueError: | |
worker = None | |
return worker | |
from distributed.utils import thread_state | |
return hasattr(thread_state, "execution_state") |
I am also worried by sending large data chunks to the scheduler process. Although I do not fully understand the impact of Along side we might also want to implement an option to the joblib dask backend to choose between global nested parallelism (by submitting nested calls to the scheduler as we currently do) and a local nested variant as suggested by @mrocklin in dask/distributed#3703 (comment) Maybe the local variant should be active by default as it's likely to be lower over-head in 99% of the cases but can potentially lead to under-subscription of the cluster in some rare cases. |
hash=False should work well in our case I think. When you call
client.scatter within a worker all you do is put the array into the
workers's .data dictionary. So this is free and simple. The data is very
likely to only stay here and not be replicated, so I think that the
benefits of hashing for deduplication are low while the cost of hashing
every array on every task is likely to be somewhat high. I suspect/hope
that not hashing resolves the problems that we're seeing and leads us to
near optimal behavior.
…On Tue, Jun 9, 2020 at 3:03 AM Olivier Grisel ***@***.***> wrote:
I am also worried by sending large data chunks to the scheduler process.
Although I do not fully understand the impact hash=True and why we get
naming collisions I think it would be worth implementing this solution.
Along side we might also want to implement an option to the joblib dask
backend to choose between global nested parallelism (by submitting nested
calls to the scheduler as we currently do) and a local nested variant as
suggested by @mrocklin <https://github.com/mrocklin> in dask/distributed#3703
(comment)
<dask/distributed#3703 (comment)>
Maybe the local variant should be active by default as it's likely to be
lower over-head in 99% of the cases but can potentially lead to
under-subscription of the cluster in some rare cases.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1061 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTGGUTGYD2FT76FTPXTRVYCGTANCNFSM4NXF5WDA>
.
|
It worked! @pierreglaser can you please update the changelog? |
Sure. |
@@ -269,15 +269,22 @@ async def maybe_to_futures(args): | |||
try: | |||
f = call_data_futures[arg] | |||
except KeyError: | |||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, we won't get confusing KeyErrors
if something wrong happens during scattering
:)
Merged! Thank you very much @pierreglaser and @mrocklin. |
Great. I am glad to see this. I'm also now curious what performance is like now as a result of this. Are there other issues that we should resolve? |
https://build.opensuse.org/request/show/821624 by user dirkmueller + dimstar_suse - update to 0.16.0 - Fix a problem in the constructors of of Parallel backends classes that inherit from the `AutoBatchingMixin` that prevented the dask backend to properly batch short tasks. joblib/joblib#1062 - Fix a problem in the way the joblib dask backend batches calls that would badly interact with the dask callable pickling cache and lead to wrong results or errors. joblib/joblib#1055 - Prevent a dask.distributed bug from surfacing in joblib's dask backend during nested Parallel calls (due to joblib's auto-scattering feature) joblib/joblib#1061 - Workaround for a race condition after Parallel calls with the dask backend that would cause low level warnings from asyncio
Workaround for dask/distributed#3703, which affects the
joblib-dask
integration.When scattering data inside
dask
worker, task using this data can often end up being cancelled, see the referenced issue for more information.This situation typically happens inside
joblib
during nestedParallel
calls (whenn the inner parallel call scatters some data).As suggested by @mrocklin, another workaround is to use
hash=False
insideclient.scatter
calls -- I'd like to compare the two solutions via a few benchmarks before merging this PR.Related, but does not fix entirely #959
This also needs tests.