Skip to content

asyncio.run_coroutine_threadsafe leaves underlying cancelled asyncio task running #105836

Open
@gh-andre

Description

@gh-andre

Bug report

When an asyncio task is created from another thread via asyncio.run_coroutine_threadsafe, a concurrent.futures.Future is returned, which wraps an underlying asyncio.Task.

In a typical use future.result() or future.exception() can be used to wait for completion. However, when the returned future is cancelled, no built-in mechanism is provided to wait for the underlying asyncio task to complete, leaving it running when application is terminated, possibly accessing dangling data.

This is an example that reproduces what is described above. There's no way to make it smaller without losing some aspect of what's going on.

The output of this code is as follows:

About to create an asyncio task
asyncio task is set up
asyncio task is running
Task cancelled (callback)
Cancelled?: True
Done?: True
Cannot access task.result()
Cannot access task.exception()
All done. No app code should run after this.
asyncio task is cancelled; rethrowing...   <-- runs after the app is shut down
Setting the asyncio task event             <-- same

Perhaps concurrent.futures.Future.cancelled() should have a timeout to wait for the underlying asyncio task to complete or some additional method provided for that.

Here's the sample code.

import asyncio
import concurrent.futures
import threading

class MyApp:
    def __init__(self) -> None:
        self.task: concurrent.futures.Future[int]|None = None

        self.task_event = asyncio.Event()
        self.task_event.clear()

    # An asyncio task that runs in the specified event loop
    async def process_task(self) -> int:
        try:
            self.task_event.clear()
            print("asyncio task is running")
            await asyncio.sleep(10)
            print("asyncio task is finished")
            return 123
        except asyncio.CancelledError:
            print("asyncio task is cancelled; rethrowing...")
            raise
        finally:
            print("Setting the asyncio task event")
            self.task_event.set()

    # Called immediately from task.cancel(), before the underlying
    # asyncio task completes.
    def task_done(self, future: concurrent.futures.Future[int]) -> None:
        if future.cancelled():
            print("Task cancelled (callback)")
        elif future.exception() is not None:
            print("Task error %s (callback)" % future.exception())
        else:
            print("Task is done with result %d (callback)" % future.result())

    def start_async_task(self, thread_event: threading.Event) -> None:

        print("About to create an asyncio task")

        self.task = asyncio.run_coroutine_threadsafe(self.process_task(), asyncio.get_running_loop())
        self.task.add_done_callback(self.task_done)

        print("asyncio task is set up")
        thread_event.set()

    # This method is supposed to be the last call to the application,
    # which shuts down all services and cancels all outstanding work.
    async def shutdown(self) -> None:

        # Calls the done callback immediately and returns before the
        # future cancels the underlying asyncio task, rendering the
        # future unusable via standard methods.
        self.task.cancel()

        # This event compensates for lack of ability to wait for the
        # underlying asyncio task. Without this wait, asyncio task
        # runs after the code that manages all this is gone.
        #
        # Perhaps Future.cancel() should provide this functionality?
        #
        #print("Waiting for a task to complete")
        #await self.task_event.wait()

        # returns True, while underlying asyncio task is still running
        print("Cancelled?: %s" % self.task.cancelled())
        print("Done?: %s" % self.task.done())

        # supposed to wait for result, but because it's cancelled, will throw
        try:
            print("Result?: %d" % self.task.result())
        except concurrent.futures.CancelledError:
            print("Cannot access task.result()")

        # or wait for exception, but because it's cancelled, will throw as well
        try:
            print("Exception?: %s" % str(self.task.exception()))
        except concurrent.futures.CancelledError:
            print("Cannot access task.exception()")

async def main() -> None:
    myapp = MyApp()

    thread_event = threading.Event()
    thread_event.clear()

    # emulates some worker thread (e.g. APScheduler)
    runner = threading.Thread(target=myapp.start_async_task, args=[thread_event])
    runner.run()

    thread_event.wait()

    await asyncio.sleep(1)

    await myapp.shutdown()

    await asyncio.sleep(0)

    print("All done. No app code should run after this.")

asyncio.run(main())

If the wait call under Waiting for a task to complete is uncommented, which would simulate having waiting implemented in Future.cancelled(), it would work as expected.

Your environment

  • CPython versions tested on: 3.10.10
  • Operating system and architecture: Microsoft Windows [Version 10.0.19045.3086]

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions