diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 321a4e5d5d18fb..224b1883808a41 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -265,7 +265,7 @@ def _try_finish(self): # to avoid hanging forever in self._wait as otherwise _exit_waiters # would never be woken up, we wake them up here. for waiter in self._exit_waiters: - if not waiter.cancelled(): + if not waiter.done(): waiter.set_result(self._returncode) if all(p is not None and p.disconnected for p in self._pipes.values()): @@ -278,7 +278,7 @@ def _call_connection_lost(self, exc): finally: # wake up futures waiting for wait() for waiter in self._exit_waiters: - if not waiter.cancelled(): + if not waiter.done(): waiter.set_result(self._returncode) self._exit_waiters = None self._loop = None diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index bf301740741ae7..ce8d7d69cadb7c 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -111,6 +111,44 @@ def test_subprocess_repr(self): ) transport.close() + def test_proc_exited_no_invalid_state_error_on_exit_waiters(self): + # gh-145541: when _connect_pipes hasn't completed (so + # _pipes_connected is False) and the process exits, _try_finish() + # sets the result on exit waiters. Then _call_connection_lost() must + # not call set_result() again on the same waiters. + exceptions = [] + orig_handler = self.loop.call_exception_handler + def exception_handler(context): + exceptions.append(context) + orig_handler(context) + self.loop.call_exception_handler = exception_handler + + waiter = self.loop.create_future() + transport, protocol = self.create_transport(waiter) + + # Simulate a waiter registered via _wait() before the process exits. + exit_waiter = self.loop.create_future() + transport._exit_waiters.append(exit_waiter) + + # _connect_pipes hasn't completed, so _pipes_connected is False. + self.assertFalse(transport._pipes_connected) + + # Simulate process exit. _try_finish() will set the result on + # exit_waiter because _pipes_connected is False, and then schedule + # _call_connection_lost() because _pipes is empty (vacuously all + # disconnected). _call_connection_lost() must skip exit_waiter + # because it's already done. + transport._process_exited(6) + self.loop.run_until_complete(waiter) + + self.assertEqual(exit_waiter.result(), 6) + for context in exceptions: + self.assertNotIsInstance( + context.get('exception'), asyncio.InvalidStateError, + ) + + transport.close() + class SubprocessMixin: @@ -918,6 +956,75 @@ async def main(): asyncio.run(main()) gc_collect() + @unittest.skipIf(sys.platform == 'win32', 'POSIX only') + @warnings_helper.ignore_warnings(category=ResourceWarning) + def test_subprocess_pipe_cancelled_no_invalid_state_error(self): + # gh-145541: when SIGINT arrives while _connect_pipes tasks are + # in flight, asyncio.run() cancels all tasks which leaves + # _pipes_connected=False. When the process then exits, + # _try_finish() sets the result on exit waiters and schedules + # _call_connection_lost(). Before the fix, + # _call_connection_lost() would call set_result() on the same + # waiters again, raising InvalidStateError. + exceptions = [] + + async def main(): + loop = asyncio.get_running_loop() + orig_handler = loop.call_exception_handler + def exception_handler(context): + exceptions.append(context) + orig_handler(context) + loop.call_exception_handler = exception_handler + + # Send SIGINT shortly so it arrives while _connect_pipes + # tasks are in flight. + loop.call_later(0.001, os.kill, os.getpid(), signal.SIGINT) + + procs = [] + + tasks = [ + asyncio.create_task( + asyncio.create_subprocess_exec( + *PROGRAM_BLOCKED, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + ) + for _ in range(50) + ] + + try: + await asyncio.gather(*tasks, return_exceptions=True) + await asyncio.sleep(10) + except asyncio.CancelledError: + pass + finally: + for task in tasks: + if (task.done() + and not task.cancelled() + and task.exception() is None): + procs.append(task.result()) + + for proc in procs: + try: + proc.kill() + except ProcessLookupError: + pass + + for proc in procs: + await proc.wait() + + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass + + gc_collect() + for context in exceptions: + self.assertNotIsInstance( + context.get('exception'), asyncio.InvalidStateError, + ) + if sys.platform != 'win32': # Unix class SubprocessWatcherMixin(SubprocessMixin): diff --git a/Misc/NEWS.d/next/Library/2026-03-05-19-01-28.gh-issue-145551.gItPRl.rst b/Misc/NEWS.d/next/Library/2026-03-05-19-01-28.gh-issue-145551.gItPRl.rst new file mode 100644 index 00000000000000..15b70d734ca3b9 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-03-05-19-01-28.gh-issue-145551.gItPRl.rst @@ -0,0 +1 @@ +Fix InvalidStateError when cancelling process created by :func:`asyncio.create_subprocess_exec` or :func:`asyncio.create_subprocess_shell`. Patch by Daan De Meyer.