Skip to content

Commit

Permalink
Add option to specify the timeout.
Browse files Browse the repository at this point in the history
  • Loading branch information
shermansiu committed Apr 29, 2024
1 parent cc372d0 commit 612abac
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion tqdm/contrib/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def _executor_map(PoolExecutor, fn, *iterables, **tqdm_kwargs):
----------
tqdm_class : [default: tqdm.auto.tqdm].
max_workers : [default: min(32, cpu_count() + 4)].
timeout : [default: None].
chunksize : [default: 1].
lock_name : [default: "":str].
"""
Expand All @@ -42,13 +43,14 @@ def _executor_map(PoolExecutor, fn, *iterables, **tqdm_kwargs):
kwargs["total"] = length_hint(iterables[0])
tqdm_class = kwargs.pop("tqdm_class", tqdm_auto)
max_workers = kwargs.pop("max_workers", min(32, cpu_count() + 4))
timeout = kwargs.pop("timeout", None)
chunksize = kwargs.pop("chunksize", 1)
lock_name = kwargs.pop("lock_name", "")
with ensure_lock(tqdm_class, lock_name=lock_name) as lk:
# share lock in case workers are already using `tqdm`
with PoolExecutor(max_workers=max_workers, initializer=tqdm_class.set_lock,
initargs=(lk,)) as ex:
return list(tqdm_class(ex.map(fn, *iterables, chunksize=chunksize), **kwargs))
return list(tqdm_class(ex.map(fn, *iterables, timeout=timeout, chunksize=chunksize), **kwargs))


def thread_map(fn, *iterables, **tqdm_kwargs):
Expand All @@ -64,6 +66,10 @@ def thread_map(fn, *iterables, **tqdm_kwargs):
Maximum number of workers to spawn; passed to
`concurrent.futures.ThreadPoolExecutor.__init__`.
[default: max(32, cpu_count() + 4)].
timeout : int or float, optional
The iterator raises a TimeoutError if __next()__ is called and the
result isn't available within the timeout specified from the
original call to thread_map. [default: None].
"""
from concurrent.futures import ThreadPoolExecutor
return _executor_map(ThreadPoolExecutor, fn, *iterables, **tqdm_kwargs)
Expand All @@ -82,6 +88,10 @@ def process_map(fn, *iterables, **tqdm_kwargs):
Maximum number of workers to spawn; passed to
`concurrent.futures.ProcessPoolExecutor.__init__`.
[default: min(32, cpu_count() + 4)].
timeout : int or float, optional
The iterator raises a TimeoutError if __next()__ is called and the
result isn't available within the timeout specified from the
original call to process_map. [default: None].
chunksize : int, optional
Size of chunks sent to worker processes; passed to
`concurrent.futures.ProcessPoolExecutor.map`. [default: 1].
Expand Down

0 comments on commit 612abac

Please sign in to comment.