Skip to content

Add timeout to multiprocessing.queues.JoinableQueue.join #104448

Open
@tmi

Description

@tmi

Feature or enhancement

A number of join methods, such as on Process, expose a timeout option. This allows for "let child process compute, but in case of it crashing or taking too long, take other action". However, the JoinableQueue exposes only a param-less join which always blocks until all tasks are done, potentially forever.

This Enhancement suggests adding the timeout option, implemented via passing it down to the wait on an already existing _condition. The default value of the param is None and retains original behaviour. To detect whether the join returned because of "succeeding" or "failing", a new is_done method is added (but I think other means, such as raising in the join, also make sense).

Pitch

The example of usage is:

q = JoinableQueue()
for task in get_tasks():
  q.put()
ps = [Process(target=worker_entrypoint, args=(q,) for _ in range(worker_count)]
for p in ps:
  p.start()

q.join(timeout=10)
if q.is_done():
  break # success
else:
  # kill the child processes and fail, or retry, ...

for p in ps:
  p.join()

While this simple example can be also handled by a Pool or ProcessPoolExecutor, there are usecases which are based on a Queue-semantics yet need a timeout-ed join. I currently have such a usecase, and solve it by subclassing JoinableQueue. I'll open a PR right away.

Previous discussion

This is somehow related to #96471 -- but less ambitious in scope, and I believe implementable independently.

Linked PRs

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions