Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-39349: Add *cancel_futures* to Executor.shutdown() #18057

Open
wants to merge 2 commits into
base: master
from

Conversation

@aeros
Copy link
Member

aeros commented Jan 19, 2020

@aeros aeros requested a review from gvanrossum Jan 19, 2020
If *cancel_futures* is ``True``, this method will cancel all pending
futures that the executor has not started running. Any futures that
are completed or running won't be cancelled, regardless of the value
of *cancel_futures*.

This comment has been minimized.

Copy link
@brianquinlan

brianquinlan Jan 21, 2020

Contributor

I think that you might need to explain what happens when cancel_futures And wait are both true? Does wait cause shutdown to not return until all of the currently running futures are finished?

with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if cancel_futures:
while True:

This comment has been minimized.

Copy link
@brianquinlan

brianquinlan Jan 21, 2020

Contributor

Could you add a comment explaining the approach?

This comment has been minimized.

Copy link
@aeros

aeros Jan 22, 2020

Author Member

Sure, I can add a comment here. The reason I didn't before is because very similar logic is used directly above in _initializer_failed(), it's effectively the same process of draining the queue with a few changes. But, since this is a public method, I think it makes sense to include a brief description here as well.

@@ -342,6 +342,31 @@ def test_hang_issue12364(self):
for f in fs:
f.result()

def test_cancel_futures(self):

This comment has been minimized.

Copy link
@brianquinlan

brianquinlan Jan 21, 2020

Contributor

Could you test some more scenarios? Like when wait is also true? When called and the interpreter exits.

This comment has been minimized.

Copy link
@aeros

aeros Jan 22, 2020

Author Member

I addressed the part about wait in #18057 (comment), but I'm not certain about how to write tests that account for the interpreter exiting. Could you elaborate on what that might look like? I'd certainly be glad to add some additional test coverage.

@@ -660,9 +665,12 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
timeout=timeout)
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True):
def shutdown(self, wait=True, cancel_futures=False):

This comment has been minimized.

Copy link
@brianquinlan

brianquinlan Jan 22, 2020

Contributor

Maybe new arguments should be keyword-only - there is at least one other PR out now aiming to add an argument to shutdown.

This comment has been minimized.

Copy link
@aeros

aeros Jan 22, 2020

Author Member

Sounds good, I'm in favor of making cancel_futures keyword-only.

@aeros

This comment has been minimized.

Copy link
Member Author

aeros commented Jan 22, 2020

@brianquinlan

I think that you might need to explain what happens when cancel_futures And wait are both true? Does wait cause shutdown to not return until all of the currently running futures are finished?

Could you test some more scenarios? Like when wait is also true? When called and the interpreter exits.

The current version of the documentation and tests already does cover what happens when cancel_futures and wait are both True, since wait is set to True by default. I figured that the vast majority of users would end up calling it as executor.shutdown(cancel_futures=True).

That being said, I think it would be worth elaborating on the behavior when wait=False and cancel_futures=True. But, the exact behavior varies a bit between executor implementations:

ThreadPoolExecutor - All work items still in the queue (meaning they haven't been assigned to a thread yet) are removed from the queue, and their associated futures are cancelled. This occurs regardless of the value of wait. As for the ones that have been assigned to a thread and are currently running, wait=True allows them to finish.

When wait=False (regardless of cancel_futures), whether or not running futures finish depends on the delay between the executor.shutdown() call and the shutdown of the interpreter. If the interpreter is shut down before the futures finish (terminating the threads w/o joining them), it gets stuck in an indefinite state of running.

The difference between using wait=False and cancel_futures=False vs wait=False and cancel_futures=True is that the pending futures will be cancelled instead of indefinitely pending.

ProcessPoolExecutor - Most of the above applies, but underlying details differ a bit. In PPE, the pending work items are not directly accessible in shutdown(), and instead of using a queue it's a dictionary who's items are transferred into a call queue and then removed from the dict. So for the purposes of cancel_futures, any work item that's still in the dict when shutdown() is called gets cancelled (specifically when executor._cancel_pending_work is set to True and after shutdown is detected in _queue_management_worker()).

This results in a bit of delay from when the flag is set to when the pending futures are cancelled, but I think this is the best way to implement cancel_futures for PPE without causing substantial performance losses. IMO, it's preferable to lose out on cancelling a few pending futures and have better overall performance.

Admittedly though, I don't have a strong understanding of the specifics that occur though when wait=False with PPE, mostly because it's quite difficult to examine in real-time. Even just calling executor.shutdown(wait=False) with PPE leads to deadlocks (not even accounting for cancel_futures), at least as of Python 3.7+.

I suspect it has to do with the way non-joined processes are finalized during interpreter shutdown, but that's not an area I'm particularly knowledgeable with. Maybe @pitrou could clarify?

I just found this out while writing an example to demonstrate the above. In my own personal usage of executor.shutdown(), I've never explicitly set wait to False or had a good reason to consider doing so. It's a separate issue, but I think it's worth addressing. The deadlock occurs that occurs for PPE will have to be addressed before I can add tests for executor.shutdown(wait=False). (unless they're specifically just added to TPE, but I tried to make the tests fully generic and applicable to both executors).

Examples:

Interaction between wait and cancel_futures: https://gist.github.com/aeros/2e73c8d6dccc94fd863967715826c78d

PPE deadlock demo: https://gist.github.com/aeros/d1ff62b730426584413bca0c8f2ed99d

@aeros

This comment has been minimized.

Copy link
Member Author

aeros commented Jan 22, 2020

Thanks for the review @brianquinlan! I'll go ahead and make some of the recommend changes while waiting on your response to the above comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.