Skip to content

Commit

Permalink
Merge pull request #9097 from peterbell10/parallel-task-sleep
Browse files Browse the repository at this point in the history
Remove unnecesary sleep when submitting parallel tasks
  • Loading branch information
tk0miya committed Apr 17, 2021
2 parents 9f44ee4 + 33b8f43 commit af53987
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions sphinx/util/parallel.py
Expand Up @@ -105,7 +105,8 @@ def add_task(self, task_func: Callable, arg: Any = None, result_func: Callable =
def join(self) -> None:
try:
while self._pworking:
self._join_one()
if not self._join_one():
time.sleep(0.02)
except Exception:
# shutdown other child processes on failure
self.terminate()
Expand All @@ -119,7 +120,8 @@ def terminate(self) -> None:
self._precvs.pop(tid)
self._pworking -= 1

def _join_one(self) -> None:
def _join_one(self) -> bool:
joined_any = False
for tid, pipe in self._precvs.items():
if pipe.poll():
exc, logs, result = pipe.recv()
Expand All @@ -131,15 +133,17 @@ def _join_one(self) -> None:
self._procs[tid].join()
self._precvs.pop(tid)
self._pworking -= 1
joined_any = True
break
else:
time.sleep(0.02)

while self._precvsWaiting and self._pworking < self.nproc:
newtid, newprecv = self._precvsWaiting.popitem()
self._precvs[newtid] = newprecv
self._procs[newtid].start()
self._pworking += 1

return joined_any


def make_chunks(arguments: Sequence[str], nproc: int, maxbatch: int = 10) -> List[Any]:
# determine how many documents to read in one go
Expand Down

0 comments on commit af53987

Please sign in to comment.