Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions Lib/asyncio/base_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def __init__(self, loop, protocol, args, shell,
self._pending_calls = collections.deque()
self._pipes = {}
self._finished = False
self._pipes_connected = False

if stdin == subprocess.PIPE:
self._pipes[0] = None
Expand Down Expand Up @@ -214,7 +213,6 @@ async def _connect_pipes(self, waiter):
else:
if waiter is not None and not waiter.cancelled():
waiter.set_result(None)
self._pipes_connected = True

def _call(self, cb, *data):
if self._pending_calls is not None:
Expand All @@ -235,6 +233,16 @@ def _process_exited(self, returncode):
if self._loop.get_debug():
logger.info('%r exited with return code %r', self, returncode)
self._returncode = returncode

# gh-119710: Wake up futures waiting for wait() as soon as the process
# exits. The pipe transports now check for the loop being closed before
# scheduling a callback preventing gh-114177. This is consistent with
# the behavior prior to 3.11 and the documented semantics in _wait().
for waiter in self._exit_waiters:
if not waiter.done():
waiter.set_result(returncode)
Comment on lines +241 to +243

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be changed back to not waiter.cancelled() (reverting that part of #145554) since this should now be the only place they are otherwise resolved.

self._exit_waiters = None

if self._proc.returncode is None:
# asyncio uses a child watcher: copy the status into the Popen
# object. On Python 3.6, it is required to avoid a ResourceWarning.
Expand All @@ -258,15 +266,7 @@ def _try_finish(self):
assert not self._finished
if self._returncode is None:
return
if not self._pipes_connected:
# self._pipes_connected can be False if not all pipes were connected
# because either the process failed to start or the self._connect_pipes task
# got cancelled. In this broken state we consider all pipes disconnected and
# to avoid hanging forever in self._wait as otherwise _exit_waiters
# would never be woken up, we wake them up here.
for waiter in self._exit_waiters:
if not waiter.done():
waiter.set_result(self._returncode)

if all(p is not None and p.disconnected
for p in self._pipes.values()):
self._finished = True
Expand All @@ -276,11 +276,6 @@ def _call_connection_lost(self, exc):
try:
self._protocol.connection_lost(exc)
finally:
# wake up futures waiting for wait()
for waiter in self._exit_waiters:
if not waiter.done():
waiter.set_result(self._returncode)
self._exit_waiters = None
self._loop = None
self._proc = None
self._protocol = None
Expand Down
44 changes: 41 additions & 3 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ def test_proc_exited_no_invalid_state_error_on_exit_waiters(self):
exit_waiter = self.loop.create_future()
transport._exit_waiters.append(exit_waiter)

# _connect_pipes hasn't completed, so _pipes_connected is False.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My inclination would be to remove this test fully since it should be covered by test_wait_even_if_pipe_is_open in a less implementation dependent way.
With a single place for notifying waiters (instead of 3) this feels too specific.

self.assertFalse(transport._pipes_connected)

# Simulate process exit. _try_finish() will set the result on
# exit_waiter because _pipes_connected is False, and then schedule
# _call_connection_lost() because _pipes is empty (vacuously all
Expand Down Expand Up @@ -436,6 +433,47 @@ async def len_message(message):
self.assertEqual(output.rstrip(), b'3')
self.assertEqual(exitcode, 0)

def test_wait_even_if_pipe_is_open(self):
# gh-119710: Process.wait() must return once the process exits even
# if its stdout pipe is inherited by a grandchild that keeps it open,
# so the pipe never reaches EOF. Otherwise wait() hangs forever
# despite the returncode being known.

async def run():
# Just setup a pipe to pass to the grandchild for reading to ensure it dies.
# Inheritable is to allow it to be passed on windows
r, w = os.pipe()
os.set_inheritable(r, True)

code = textwrap.dedent(f"""\
import subprocess, sys
subprocess.run([sys.executable, "-c", "import sys;sys.stdin.read()"])
""")

proc = await asyncio.create_subprocess_exec(
sys.executable, "-c", code,
# This will be inherited by granchild and should not prevent
# *this* process from firing .wait().
stdout=subprocess.PIPE,
stdin=r,
pass_fds=(r,) if sys.platform != "win32" else (),
close_fds=False if sys.platform == "win32" else True,
)
os.close(r)

try:
# Ensure we start waiting before the process is killed.
wait_proc = asyncio.create_task(proc.wait())
await asyncio.sleep(0.1)
proc.kill()
await asyncio.wait_for(wait_proc, timeout=2.0)
finally:
os.close(w) # Allows the grandchild to exit
if proc.stdout is not None:
await proc.stdout.read()

self.loop.run_until_complete(run())

def test_empty_input(self):

async def empty_input():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fix :mod:`asyncio` subprocess :meth:`~asyncio.subprocess.Process.wait`
hanging when the process has exited but one of its pipes is kept open by an
inherited child process (so the pipe never reaches EOF). ``wait()`` now
returns as soon as the process exits, regardless of the pipes' state.
Loading