New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
asyncio.Semaphore waiters deque doesn't work #90155
Comments
Class import asyncio
async def process(idx: int, semaphore: asyncio.Semaphore) -> None:
while True:
async with semaphore:
print(f'ACQUIRE {idx}')
await asyncio.sleep(1)
async def main() -> None:
semaphore = asyncio.Semaphore(5)
await asyncio.gather(*[process(idx, semaphore) for idx in range(20)])
asyncio.run(main()) In console:
Ugly fix, is to add asyncio.sleep right before semaphore. while True:
await asyncio.sleep(0)
async with semaphore:
... Also, I found a comment on Stack Overflow about race condition in Semaphore implementation, but I don't know about the quality of that comment: |
Good point. In a tight loop, a task can re-acquire the lock just after releasing even if there are pending waiters that were scheduled earlier. It's true also for Lock, Conditional, Event, etc. The solution requires async release method. Since the change is not backward compatible, a new method should be added, e.g. async context manager can be modified for using the new method without backward compatibility problems easily. A hero who can help is welcome! |
Or, maybe, there is a way to do everything without changing public API. The idea is: _wake_up_next can create a future which is set by *waked up task* on its acquiring. acquire method should wait for this future first before entering in `while self._value < 0:` loop. If the future is cancelled, If there is no *acquire waiting* future exists -- do everything as usual. All other lock objects should be modified also. |
Thanks for response!
I will try to make PR :raising_hand |
Andrew, the same problem exists in asyncio.Queue which is also critical. Here's how I fixed it in edgedb code base: https://github.com/edgedb/edgedb/blob/08e41341024828df22a01cd690b11fcff00bca5e/edb/server/compiler_pool/queue.py#L51-L74 |
Thanks, Yuri. |
Maybe not the end of story. This shows how to create a broken semaphore (on commit 1971014): from asyncio import CancelledError
from asyncio import Semaphore
from asyncio import create_task
from asyncio import gather
from asyncio import run
from asyncio import sleep
async def process(idx, sem, tasks):
await sleep(idx)
async with sem:
await sleep(2)
tasks[1].cancel()
async def main():
print('let me run for 3 seconds')
sem = Semaphore(1)
tasks = []
for idx in range(2):
tasks.append(create_task(process(idx, sem, tasks)))
await sleep(3)
await gather(*tasks, return_exceptions=True)
print(f'sem._value == {sem._value}')
print('sem is still...', end='', flush=True)
async with sem:
print('working!')
run(main()) Expected output:
Actual output:
PR 93222 has been made for this. Might be helpful. |
I was going through the changes trying to understand how all this works in various cases and I noticed that the locked method was not adjusted. It currently reads:
but after the latest changes that added the
? Since the comment indicates that this method returns
it leads me to think that locked method needs to be adjusted to have the same condition. Someone with a better understanding of how this is supposed to work, please weigh in. |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: