Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEA parallel_config context manager to allow more fine-grained control #1392

Merged
merged 19 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions joblib/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ def eval_(node):
return operators[type(node.op)](eval_(node.operand))
else:
raise TypeError(node)


class _Sentinel:
def __init__(self, default_value):
self.default_value = default_value

def __repr__(self):
return f"default({self.default_value!r})"
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved
309 changes: 232 additions & 77 deletions joblib/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
ThreadingBackend, SequentialBackend,
LokyBackend)
from .externals.cloudpickle import dumps, loads
from ._utils import eval_expr
from ._utils import eval_expr, _Sentinel

# Make sure that those two classes are part of the public joblib.parallel API
# so that 3rd party backend implementers can import them from here.
Expand Down Expand Up @@ -64,9 +64,6 @@
# manager
_backend = threading.local()

VALID_BACKEND_HINTS = ('processes', 'threads', None)
VALID_BACKEND_CONSTRAINTS = ('sharedmem', None)


def _register_dask():
""" Register Dask Backend if called with parallel_backend("dask") """
Expand All @@ -86,23 +83,86 @@ def _register_dask():
}


# Sentinels for the default values of the Parallel constructor and
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved
# the parallel_config and parallel_backend context managers
default_parallel_config_params = {
"n_jobs": _Sentinel(default_value=None),
"verbose": _Sentinel(default_value=0),
"temp_folder": _Sentinel(default_value=None),
"max_nbytes": _Sentinel(default_value="1M"),
"mmap_mode": _Sentinel(default_value="r"),
"prefer": _Sentinel(default_value=None),
"require": _Sentinel(default_value=None),
}


VALID_BACKEND_HINTS = (
'processes', 'threads', default_parallel_config_params["prefer"], None
)
VALID_BACKEND_CONSTRAINTS = (
'sharedmem', default_parallel_config_params["require"], None
)


def _get_config_param(param, context_config, key):
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved
if param is not default_parallel_config_params[key]:
# param is explicitely set, return it
return param

if context_config is None or key not in context_config:
# no context manager just return the default value.
return param.default_value

if context_config[key] is not default_parallel_config_params[key]:
# there's a context manager and the key is set, return it
return context_config[key]

# there's a context manager but the key is not set, return the default
return param.default_value


def get_active_backend(prefer=None, require=None, verbose=0):
"""Return the active default backend"""
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved
backend, config = _get_active_backend(prefer, require, verbose)
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved
return backend, config["n_jobs"]


def _get_active_backend(
prefer=default_parallel_config_params["prefer"],
require=default_parallel_config_params["require"],
verbose=default_parallel_config_params["verbose"],
):
"""Return the active default backend"""
if prefer not in VALID_BACKEND_HINTS:
raise ValueError("prefer=%r is not a valid backend hint, "
"expected one of %r" % (prefer, VALID_BACKEND_HINTS))
raise ValueError(
f"prefer={prefer} is not a valid backend hint, "
f"expected one of {VALID_BACKEND_HINTS}"
)
if require not in VALID_BACKEND_CONSTRAINTS:
raise ValueError("require=%r is not a valid backend constraint, "
"expected one of %r"
% (require, VALID_BACKEND_CONSTRAINTS))

raise ValueError(
f"require={require} is not a valid backend constraint, "
f"expected one of {VALID_BACKEND_CONSTRAINTS}"
)
if prefer == 'processes' and require == 'sharedmem':
raise ValueError("prefer == 'processes' and require == 'sharedmem'"
" are inconsistent settings")
backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
if backend_and_jobs is not None:
raise ValueError(
"prefer == 'processes' and require == 'sharedmem'"
" are inconsistent settings"
)

backend_config = getattr(_backend, 'config', None)
if backend_config is not None:
backend = backend_config['backend']
config = {k: v for k, v in backend_config.items() if k != 'backend'}
else:
backend, config = None, None

prefer = _get_config_param(prefer, config, "prefer")
require = _get_config_param(require, config, "require")
verbose = _get_config_param(verbose, config, "verbose")

if backend is not None:
# Try to use the backend set by the user with the context manager.
backend, n_jobs = backend_and_jobs

nesting_level = backend.nesting_level
supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
if require == 'sharedmem' and not supports_sharedmem:
Expand All @@ -115,11 +175,12 @@ def get_active_backend(prefer=None, require=None, verbose=0):
"as the latter does not provide shared memory semantics."
% (sharedmem_backend.__class__.__name__,
backend.__class__.__name__))
return sharedmem_backend, DEFAULT_N_JOBS
return sharedmem_backend, config
else:
return backend_and_jobs
return backend, config

# We are outside of the scope of any parallel_backend context manager,
# We are either outside of the scope of any parallel_(config/backend)
# context manager or the context manager did not set a backend.
# create the default backend instance now.
backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
Expand All @@ -129,10 +190,101 @@ def get_active_backend(prefer=None, require=None, verbose=0):
# Make sure the selected default backend match the soft hints and
# hard constraints:
backend = BACKENDS[DEFAULT_THREAD_BACKEND](nesting_level=0)
return backend, DEFAULT_N_JOBS

if config is not None:
return backend, config
return backend, {"n_jobs": DEFAULT_N_JOBS}


class parallel_config:
def __init__(
self,
backend=None,
n_jobs=default_parallel_config_params["n_jobs"],
verbose=default_parallel_config_params["verbose"],
temp_folder=default_parallel_config_params["temp_folder"],
max_nbytes=default_parallel_config_params["max_nbytes"],
mmap_mode=default_parallel_config_params["mmap_mode"],
prefer=default_parallel_config_params["prefer"],
require=default_parallel_config_params["require"],
inner_max_num_threads=None,
**backend_params
):
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
self.old_parallel_config = getattr(_backend, "config", None)

backend = self._check_backend(
backend, inner_max_num_threads, **backend_params
)

# Save the parallel info and set the active parallel config
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved
self.new_parallel_config = {
"backend": backend,
"n_jobs": n_jobs,
"verbose": verbose,
"temp_folder": temp_folder,
"max_nbytes": max_nbytes,
"mmap_mode": mmap_mode,
"prefer": prefer,
"require": require,
}
setattr(_backend, "config", self.new_parallel_config)

def _check_backend(self, backend, inner_max_num_threads, **backend_params):
if backend is None:
return None

if isinstance(backend, str):
if backend not in BACKENDS:
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
if backend in EXTERNAL_BACKENDS:
register = EXTERNAL_BACKENDS[backend]
register()
elif backend in MAYBE_AVAILABLE_BACKENDS:
warnings.warn(
f"joblib backend '{backend}' is not available on "
f"your system, falling back to {DEFAULT_BACKEND}.",
UserWarning,
stacklevel=2)
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved
BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
else:
raise ValueError("Invalid backend: %s, expected one of %r"
% (backend, sorted(BACKENDS.keys())))
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved

backend = BACKENDS[backend](**backend_params)

if inner_max_num_threads is not None:
msg = ("{} does not accept setting the inner_max_num_threads "
"argument.".format(backend.__class__.__name__))
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved
assert backend.supports_inner_max_num_threads, msg
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
backend.inner_max_num_threads = inner_max_num_threads

# If the nesting_level of the backend is not set previously, use the
# nesting level from the previous active_backend to set it
if backend.nesting_level is None:
if self.old_parallel_config is None:
nesting_level = 0
else:
nesting_level = (
self.old_parallel_config["backend"].nesting_level
)
backend.nesting_level = nesting_level

class parallel_backend(object):
return backend

def __enter__(self):
return self.new_parallel_config

def __exit__(self, type, value, traceback):
self.unregister()

def unregister(self):
if self.old_parallel_config is None:
if getattr(_backend, "config", None) is not None:
delattr(_backend, "config")
else:
setattr(_backend, "config", self.old_parallel_config)


class parallel_backend(parallel_config):
"""Change the default backend used by Parallel inside a with block.

If ``backend`` is a string it must match a previously registered
Expand Down Expand Up @@ -198,60 +350,36 @@ class parallel_backend(object):
"""
def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
**backend_params):
if isinstance(backend, str):
if backend not in BACKENDS:
if backend in EXTERNAL_BACKENDS:
register = EXTERNAL_BACKENDS[backend]
register()
elif backend in MAYBE_AVAILABLE_BACKENDS:
warnings.warn(
f"joblib backend '{backend}' is not available on "
f"your system, falling back to {DEFAULT_BACKEND}.",
UserWarning,
stacklevel=2)
BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
else:
raise ValueError("Invalid backend: %s, expected one of %r"
% (backend, sorted(BACKENDS.keys())))

backend = BACKENDS[backend](**backend_params)

if inner_max_num_threads is not None:
msg = ("{} does not accept setting the inner_max_num_threads "
"argument.".format(backend.__class__.__name__))
assert backend.supports_inner_max_num_threads, msg
backend.inner_max_num_threads = inner_max_num_threads

# If the nesting_level of the backend is not set previously, use the
# nesting level from the previous active_backend to set it
current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
if backend.nesting_level is None:
if current_backend_and_jobs is None:
nesting_level = 0
else:
nesting_level = current_backend_and_jobs[0].nesting_level

backend.nesting_level = nesting_level

# Save the backends info and set the active backend
self.old_backend_and_jobs = current_backend_and_jobs
self.new_backend_and_jobs = (backend, n_jobs)
# warnings.warn(
# "The parallel_config context manager should be used instead of "
# "parallel_backend. The parallel_backend context manager might "
# "be removed in future versions of joblib.",
# FutureWarning,
# )
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved

super().__init__(
backend=backend,
n_jobs=n_jobs,
inner_max_num_threads=inner_max_num_threads,
**backend_params
)

_backend.backend_and_jobs = (backend, n_jobs)
if self.old_parallel_config is None:
self.old_backend_and_jobs = None
else:
self.old_backend_and_jobs = (
self.old_parallel_config["backend"],
self.old_parallel_config["n_jobs"],
)
self.new_backend_and_jobs = (
self.new_parallel_config["backend"],
self.new_parallel_config["n_jobs"],
)

def __enter__(self):
return self.new_backend_and_jobs

def __exit__(self, type, value, traceback):
self.unregister()

def unregister(self):
if self.old_backend_and_jobs is None:
if getattr(_backend, 'backend_and_jobs', None) is not None:
del _backend.backend_and_jobs
else:
_backend.backend_and_jobs = self.old_backend_and_jobs


# Under Linux or OS X the default start method of multiprocessing
# can cause third party libraries to crash. Under Python 3.4+ it is possible
Expand Down Expand Up @@ -666,29 +794,56 @@ class Parallel(Logger):
[Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished

'''
def __init__(self, n_jobs=None, backend=None, verbose=0, timeout=None,
pre_dispatch='2 * n_jobs', batch_size='auto',
temp_folder=None, max_nbytes='1M', mmap_mode='r',
prefer=None, require=None):
active_backend, context_n_jobs = get_active_backend(
def __init__(
self,
n_jobs=default_parallel_config_params["n_jobs"],
backend=None,
verbose=default_parallel_config_params["verbose"],
timeout=None,
pre_dispatch='2 * n_jobs',
batch_size='auto',
temp_folder=default_parallel_config_params["temp_folder"],
max_nbytes=default_parallel_config_params["max_nbytes"],
mmap_mode=default_parallel_config_params["mmap_mode"],
prefer=default_parallel_config_params["prefer"],
require=default_parallel_config_params["require"],
):
active_backend, context_config = _get_active_backend(
prefer=prefer, require=require, verbose=verbose)
jeremiedbb marked this conversation as resolved.
Show resolved Hide resolved

nesting_level = active_backend.nesting_level
if backend is None and n_jobs is None:

if backend is None and n_jobs is default_parallel_config_params["n_jobs"]:
# If we are under a parallel_backend context manager, look up
# the default number of jobs and use that instead:
n_jobs = context_n_jobs
if n_jobs is None:
n_jobs = context_config["n_jobs"]
if n_jobs is None or n_jobs is default_parallel_config_params["n_jobs"]:
# No specific context override and no specific value request:
# default to 1.
n_jobs = 1
self.n_jobs = n_jobs
self.verbose = verbose

if verbose is default_parallel_config_params["verbose"]:
self.verbose = verbose.default_value
else:
self.verbose = verbose

self.timeout = timeout
self.pre_dispatch = pre_dispatch
self._ready_batches = queue.Queue()
self._id = uuid4().hex
self._reducer_callback = None

# Check if we are under a parallel_config or parallel_backend
# context manager and use the config from the context manager
# for arguments that are not explicitly set.
max_nbytes = _get_config_param(max_nbytes, context_config, "max_nbytes")
temp_folder = _get_config_param(temp_folder, context_config, "temp_folder")
mmap_mode = _get_config_param(mmap_mode, context_config, "mmap_mode")
prefer = _get_config_param(prefer, context_config, "prefer")
require = _get_config_param(require, context_config, "require")
verbose = _get_config_param(verbose, context_config, "verbose")

if isinstance(max_nbytes, str):
max_nbytes = memstr_to_bytes(max_nbytes)

Expand Down