Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-29842: Make Executor.map less eager so it handles large/unbounded… #18566

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

graingert
Copy link
Contributor

@graingert graingert commented Feb 20, 2020

… input iterables appropriately

bugs.python.org/issue29842

bugs.python.org/issue29842

recreate of #707 with conflicts fixed and versionchanged updated to py3.9

/cc @MojoVampire

https://bugs.python.org/issue29842

Copy link

@rdarder rdarder left a comment

Hi there! sorry for intruding. I was looking for this feature and decided to try this patch out. Works great! Just found a small issue so I sent you a comment.
Thank you!

except StopIteration:
argsiter = None
else:
fs.append(self.submit(fn, *args))
Copy link

@rdarder rdarder Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the executor has been shut down, this will raise:
cannot schedule new futures after shutdown
But, also the base executor holds no state, so at this level it'll be pretty hard to tell if the executor has been shut down or not.

Copy link
Contributor Author

@graingert graingert Jul 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdarder
this could catch that RuntimeError and just return, or the executor could keep a weakset of result_iterators and close them on shutdown

.. versionchanged:: 3.5
Added the *chunksize* argument.

.. versionchanged:: 3.9
Copy link
Contributor Author

@graingert graingert Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. versionchanged:: 3.9
.. versionchanged:: 3.11

@Tronic
Copy link

Tronic commented Oct 29, 2021

While waiting for this to be included in Python, one can easily copy&paste the map function out of this version and call it with the executor as the first argument. Hoping that you get this merged soon!

@kumaraditya303 kumaraditya303 self-requested a review Jul 18, 2022
Doc/library/concurrent.futures.rst Outdated Show resolved Hide resolved
@kumaraditya303 kumaraditya303 added type-feature A feature request or enhancement 3.12 labels Jul 18, 2022
@kumaraditya303
Copy link
Contributor

kumaraditya303 commented Jul 23, 2022

@graingert Tests are failing, can you fix them and rebase to main? I'll then review and run through the buildbots.

Lib/concurrent/futures/_base.py Outdated Show resolved Hide resolved
else:
yield fs.pop().result(end_time - time.monotonic())
Copy link
Contributor Author

@graingert graingert Jul 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't cancel the currently waited on fut correctly: #95166

import contextlib
import functools
import concurrent.futures
import threading
import sys


def fn(num, stop_event):
    if num == 1:
        stop_event.wait()
        return "done 1"

    if num == 2:
        return "done 2"


def main():
    stop_event = threading.Event()
    log = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:

        def print_n_wait(ident):
            log.append(f"{ident=} started")
            try:
                stop_event.wait()
            finally:
                log.append(f"{ident=} stopped")

        fut = pool.submit(print_n_wait, ident="first")
        try:
            with contextlib.closing(pool.map(print_n_wait, ["second", "third"], timeout=1)) as gen:
                try:
                    next(gen)
                except concurrent.futures.TimeoutError:
                    print("timed out")
                else:
                    raise RuntimeError("timeout expected")
        finally:
            stop_event.set()

    assert log == ["ident='first' started", "ident='first' stopped"], f"{log=} is wrong"

if __name__ == "__main__":
    sys.exit(main())

@kumaraditya303
Copy link
Contributor

kumaraditya303 commented Jul 23, 2022

@graingert You can run make patchcheck to fix the docs CI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3.12 awaiting review performance Performance or resource usage stdlib Python modules in the Lib dir type-feature A feature request or enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants