-
-
Notifications
You must be signed in to change notification settings - Fork 34.3k
GH-111693: Propagate correct asyncio.CancelledError instance out of asyncio.Condition.wait() #111694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-111693: Propagate correct asyncio.CancelledError instance out of asyncio.Condition.wait() #111694
Changes from 8 commits
c3aff47
111e74a
6d41771
4e16fa7
ad56f29
30e0ec4
6d59820
c060f40
62bd6cd
01046c5
8802dfa
f7c103c
1572b2b
dce420e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,6 +95,9 @@ async def acquire(self): | |
| This method blocks until the lock is unlocked, then sets it to | ||
| locked and returns True. | ||
| """ | ||
| # Implement fair scheduling, where thread always waits | ||
| # its turn. | ||
| # Jumping the queue if all are cancelled is an optimization. | ||
| if (not self._locked and (self._waiters is None or | ||
| all(w.cancelled() for w in self._waiters))): | ||
| self._locked = True | ||
|
|
@@ -105,19 +108,20 @@ async def acquire(self): | |
| fut = self._get_loop().create_future() | ||
| self._waiters.append(fut) | ||
|
|
||
| # Finally block should be called before the CancelledError | ||
| # handling as we don't want CancelledError to call | ||
| # _wake_up_first() and attempt to wake up itself. | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| try: | ||
| try: | ||
| await fut | ||
| finally: | ||
| self._waiters.remove(fut) | ||
| except exceptions.CancelledError: | ||
| except BaseException: | ||
|
||
| # Ensure the lock invariant: If lock is not claimed (or about to be by us) | ||
| # and there is a Task in waiters, | ||
| # ensure that that Task (now at the head) will run. | ||
|
||
| if not self._locked: | ||
| self._wake_up_first() | ||
| raise | ||
|
|
||
| # assert self._locked is False | ||
| self._locked = True | ||
| return True | ||
|
|
||
|
|
@@ -269,17 +273,22 @@ async def wait(self): | |
| self._waiters.remove(fut) | ||
|
|
||
| finally: | ||
| # Must reacquire lock even if wait is cancelled | ||
| cancelled = False | ||
| # Must reacquire lock even if wait is cancelled. | ||
gvanrossum marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # We only catch CancelledError here, since we don't want any | ||
| # other (fatal) errors with the future to cause us to spin. | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| err = None | ||
| while True: | ||
| try: | ||
| await self.acquire() | ||
| break | ||
| except exceptions.CancelledError: | ||
| cancelled = True | ||
| except exceptions.CancelledError as e: | ||
| err = e | ||
|
|
||
| if cancelled: | ||
| raise exceptions.CancelledError | ||
| if err: | ||
| try: | ||
| raise err # Re-raise most recent exception instance | ||
| finally: | ||
| err = None # Break reference cycles | ||
|
|
||
| async def wait_for(self, predicate): | ||
| """Wait until a predicate becomes true. | ||
|
|
@@ -378,16 +387,17 @@ async def acquire(self): | |
| fut = self._get_loop().create_future() | ||
| self._waiters.append(fut) | ||
|
|
||
| # Finally block should be called before the CancelledError | ||
| # handling as we don't want CancelledError to call | ||
| # _wake_up_first() and attempt to wake up itself. | ||
| try: | ||
| try: | ||
| await fut | ||
| finally: | ||
| self._waiters.remove(fut) | ||
| except exceptions.CancelledError: | ||
| if not fut.cancelled(): | ||
| except BaseException: | ||
| if fut.done() and not fut.cancelled(): | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # Our Future was successfully set to True via _wake_up_next(), | ||
| # but we are not about to successfully acquire(). Therefore we | ||
| # must undo the bookkeeping already done and attempt to wake | ||
| # up someone else. | ||
| self._value += 1 | ||
| self._wake_up_next() | ||
| raise | ||
|
|
@@ -414,6 +424,7 @@ def _wake_up_next(self): | |
| if not fut.done(): | ||
| self._value -= 1 | ||
| fut.set_result(True) | ||
| # assert fut.true() and not fut.cancelled() | ||
|
||
| return | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -758,6 +758,63 @@ async def test_timeout_in_block(self): | |
| with self.assertRaises(asyncio.TimeoutError): | ||
| await asyncio.wait_for(condition.wait(), timeout=0.5) | ||
|
|
||
| async def test_cancelled_error_wakeup(self): | ||
| """Test that a cancelled error, received when awaiting wakeup | ||
| will be re-raised un-modified. | ||
| """ | ||
|
||
| wake = False | ||
| raised = None | ||
| cond = asyncio.Condition() | ||
|
|
||
| async def func(): | ||
| nonlocal raised | ||
| async with cond: | ||
| with self.assertRaises(asyncio.CancelledError) as err: | ||
| await cond.wait_for(lambda: wake) | ||
| raised = err.exception | ||
| raise raised | ||
|
|
||
| task = asyncio.create_task(func()) | ||
| await asyncio.sleep(0) | ||
| # Task is waiting on the condition, cancel it there | ||
| task.cancel(msg="foo") | ||
| with self.assertRaises(asyncio.CancelledError) as err: | ||
| await task | ||
| self.assertEqual(err.exception.args, ("foo",)) | ||
| # we should have got the _same_ exception instance as the one originally raised | ||
| self.assertIs(err.exception, raised) | ||
|
|
||
| async def test_cancelled_error_re_aquire(self): | ||
| """Test that a cancelled error, received when re-aquiring lock, | ||
| will be re-raised un-modified. | ||
| """ | ||
| wake = False | ||
| raised = None | ||
| cond = asyncio.Condition() | ||
|
|
||
| async def func(): | ||
| nonlocal raised | ||
| async with cond: | ||
| with self.assertRaises(asyncio.CancelledError) as err: | ||
| await cond.wait_for(lambda: wake) | ||
| raised = err.exception | ||
| raise raised | ||
|
|
||
| task = asyncio.create_task(func()) | ||
| await asyncio.sleep(0) | ||
| # Task is waiting on the condition | ||
| await cond.acquire() | ||
| wake = True | ||
| cond.notify() | ||
| await asyncio.sleep(0) | ||
| # task is now trying to re-acquire the lock, cancel it there | ||
| task.cancel(msg="foo") | ||
| cond.release() | ||
| with self.assertRaises(asyncio.CancelledError) as err: | ||
| await task | ||
| self.assertEqual(err.exception.args, ("foo",)) | ||
| # we should have got the _same_ exception instance as the one originally raised | ||
| self.assertIs(err.exception, raised) | ||
|
|
||
| class SemaphoreTests(unittest.IsolatedAsyncioTestCase): | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.