Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Dask backend #1411

Merged
merged 17 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 20 additions & 18 deletions examples/parallel/distributed_backend_simple.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
"""
Using dask distributed for single-machine parallel computing
=============================================================
Using Dask for single-machine parallel computing
================================================

This example shows the simplest usage of the dask `distributed
<https://distributed.readthedocs.io>`__ backend, on the local computer.
This example shows the simplest usage of the
`Dask <https://docs.dask.org/en/stable/>`_
backend on your local machine.

This is useful for prototyping a solution, to later be run on a truly
distributed cluster, as the only change to be made is the address of the
scheduler.
`distributed Dask cluster
<https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_,
as the only change needed is the cluster class.

Another realistic usage scenario: combining dask code with joblib code,
for instance using dask for preprocessing data, and scikit-learn for
machine learning. In such a setting, it may be interesting to use
distributed as a backend scheduler for both dask and joblib, to
orchestrate well the computation.
orchestrate the computation.

"""

###############################################################################
# Setup the distributed client
###############################################################################
from dask.distributed import Client
from dask.distributed import Client, LocalCluster

# If you have a remote cluster running Dask
# client = Client('tcp://scheduler-address:8786')
# replace with whichever cluster class you're using
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
cluster = LocalCluster()
# connect client to your cluster
client = Client(cluster)

# If you want Dask to set itself up on your personal computer
client = Client(processes=False)
# Monitor your computation with the Dask dashboard
print(client.dashboard_link)

###############################################################################
# Run parallel computation using dask.distributed
Expand All @@ -37,18 +41,16 @@


def long_running_function(i):
time.sleep(.1)
time.sleep(0.1)
return i


###############################################################################
# The verbose messages below show that the backend is indeed the
# dask.distributed one
with joblib.parallel_backend('dask'):
with joblib.parallel_backend("dask"):
joblib.Parallel(verbose=100)(
joblib.delayed(long_running_function)(i)
for i in range(10))
joblib.delayed(long_running_function)(i) for i in range(10)
)

###############################################################################
# Progress in computation can be followed on the distributed web
# interface, see https://dask.pydata.org/en/latest/diagnostics-distributed.html
18 changes: 14 additions & 4 deletions joblib/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,20 @@ class parallel_backend(object):
multiprocessing and loky may not be available, in which case joblib
defaults to threading.

In addition, if the `dask` and `distributed` Python packages are installed,
it is possible to use the 'dask' backend for better scheduling of nested
parallel calls without over-subscription and potentially distribute
parallel calls over a networked cluster of several hosts.
You can also use the `Dask <https://docs.dask.org/en/stable/>`_ joblib
backend to distribute work across machines. This works well with
scikit-learn estimators with the ``n_jobs`` parameter, for example::
tomMoral marked this conversation as resolved.
Show resolved Hide resolved

>>> import joblib # doctest: +SKIP
>>> from sklearn.model_selection import GridSearchCV # doctest: +SKIP
>>> from dask.distributed import Client # doctest: +SKIP

>>> # create a local Dask cluster
>>> client = Client() # doctest: +SKIP
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
>>> grid_search = GridSearchCV(estimator, param_grid, n_jobs=-1)
... # doctest: +SKIP
>>> with joblib.parallel_backend("dask", scatter=[X, y]): # doctest: +SKIP
... grid_search.fit(X, y)

It is also possible to use the distributed 'ray' backend for distributing
the workload to a cluster of nodes. To use the 'ray' joblib backend add
Expand Down