Skip to content

gh-91378: Allow subprocess pass-thru with stdout/stderr capture #32344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion Doc/library/subprocess.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ functions.
start_new_session=False, pass_fds=(), *, group=None, \
extra_groups=None, user=None, umask=-1, \
encoding=None, errors=None, text=None, pipesize=-1, \
process_group=None)
process_group=None, read_stdout_callback=None, \
read_stderr_callback=None)

Execute a child program in a new process. On POSIX, the class uses
:meth:`os.execvpe`-like behavior to execute the child program. On Windows,
Expand Down Expand Up @@ -684,6 +685,27 @@ functions.
.. versionadded:: 3.10
The ``pipesize`` parameter was added.

*read_stdout_callback*, if supplied, is called upon :data:`PIPE` data
being read for *stdout* of the child process. It must expect four
args: The :class:`Popen` instance, an ``output_buffer`` list of bytes
to append accumulated data to, and the ``data`` (bytes) just read from
the pipe. In order to maintain standard Popen behavior of collecting pipe
data to be available at the finish in the *stdout* attribute,
``output_buffer.append(data)`` must be called within the callback.

*read_stderr_callback*, as above, but for *stderr*.

.. versionadded: 3.11
The ``read_stdout_callback`` and ``read_stderr_callback`` parameters
were added.

Add a tee'ing handler that may be accessed by calling
``tee_pipe_to(handle)``. It takes the handle of the file to clone to
as an argument, such as *sys.stdout* or *sys.stderr*.

.. versionadded: 3.11
The ``tee_pipe_to()`` method was added.

Popen objects are supported as context managers via the :keyword:`with` statement:
on exit, standard file descriptors are closed, and the process is waited for.
::
Expand Down
44 changes: 41 additions & 3 deletions Lib/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ class Popen:
encoding and errors: Text mode encoding and error handling to use for
file objects stdin, stdout and stderr.

read_stdout_callback: Callable to handle reading from the output pipes.
It takes a Popen object, a list of bytes objects to append the
new bytes to, and the new bytes themselves.

read_stderr_callback: As above, but for stderr.

Attributes:
stdin, stdout, stderr, pid, returncode
"""
Expand All @@ -812,7 +818,8 @@ def __init__(self, args, bufsize=-1, executable=None,
restore_signals=True, start_new_session=False,
pass_fds=(), *, user=None, group=None, extra_groups=None,
encoding=None, errors=None, text=None, umask=-1, pipesize=-1,
process_group=None):
process_group=None, read_stdout_callback=None,
read_stderr_callback=None):
"""Create new Popen instance."""
if not _can_fork_exec:
raise OSError(
Expand Down Expand Up @@ -864,6 +871,18 @@ def __init__(self, args, bufsize=-1, executable=None,
self.encoding = encoding
self.errors = errors
self.pipesize = pipesize
if read_stdout_callback is not None:
if not callable(read_stdout_callback):
raise ValueError("read_stdout_callback not a callable")
self.read_stdout_callback = read_stdout_callback
else:
self.read_stdout_callback = Popen._read_common_handler
if read_stderr_callback is not None:
if not callable(read_stderr_callback):
raise ValueError("read_stderr_callback not a callable")
self.read_stderr_callback = read_stderr_callback
else:
self.read_stderr_callback = Popen._read_common_handler

# Validate the combinations of text and universal_newlines
if (text is not None and universal_newlines is not None
Expand Down Expand Up @@ -1306,6 +1325,16 @@ def _close_pipe_fds(self,
# Prevent a double close of these handles/fds from __init__ on error.
self._closed_child_pipe_fds = True

def tee_pipe_to(output_fh):
def _tee_handler(self, buffer, data):
buffer.append(data)
output_fh.write(data)
return _tee_handler

def _read_common_handler(self, buffer, data):
"""Default handler for read_stdout_callback and read_stderr_callback."""
buffer.append(data)

if _mswindows:
#
# Windows methods
Expand Down Expand Up @@ -1565,7 +1594,12 @@ def _wait(self, timeout):


def _readerthread(self, fh, buffer):
buffer.append(fh.read())
is_stdout = fh == self.stdout
data = fh.read()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windows uses threads for the stdout and stderr reading so that it can just do this single blocking unbounded blocking read(). Keeping this behavior when a read_callaback or tee is specified seems to defeat the purpose as it'll wait until the process has completed before calling the callback (tee'ing the output) all at once. That isn't what someone who has specified tee wants and probably isn't what someone who has specified their own read_callback wants. As there is little point in either of those if you don't get data asynchronously as it comes in.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does one then handle a disparity of rates between how quickly one can read from fh and how quickly they can write to sys.stdout? I don't see a simple solution, unless I've misunderstood what you're saying.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect any tee like process to handle this. The posix side of things in this code doesn't either, it reads up to 32k when the selector event says the pipe has data and calls the callback on that. if the callback blocks, the child process can thus also block on IO when one of its pipe buffers fills up.

there's also the issue of threading, if a callback might be called from a different thread rather than in the thread calling Popen.communicate, that needs to be documented.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if that's a constraint of the original design that we're just uncovering? Is that really an issue to fix here, or just something to call out and having been exposed...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original design consumes all pipe data as it comes in and buffers it as it is only available as a whole upon process exit when communicate() returns. The reason I bring this up here is that the tee and callback behavior would be inconsistent between Windows and posix such that the a read callback and tee wouldn't be very useful on Windows.

Copy link
Author

@pprindeville pprindeville May 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gpshead Do you want me to proceed with that change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gpshead I don't know how to proceed. It would seem that the functionality of this feature has additional contractual obligations on Windows that aren't implied anywhere else. Should I just limit the functionality of the baked-in "tee" functionality to non-Windows (i.e. POSIX) platforms instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've disabled the tee functionality for Windows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still leaves Windows behind, it won't make callbacks in real time as data comes in. It isn't about tee so much as it's about having a similar user experience and behavior on both platforms. We need to conditionally do bounded read()s in a loop on windows when a callback is set.

From a user point of view "good enough" is likely be to call fh.readline(2000) in a loop in this situation. The things someone wants to see data from live are often text based so reading by line, if present, otherwise a limited buffer is roughly the same to non-blocking reads of everything available. something like this:

if not callback:
    buffer.append(fh.read())  # the existing logic
else:
    data = None
    while data != b'':
        data = fh.readline(2000)
        callback(self, buffer, data)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so much as it's about having a similar user experience and behavior on both platforms

It seems like we're bifurcating the behavior on Windows depending on whether the callback has been installed or not. Why not just have consistent and predictable behavior?

if is_stdout:
self.read_stdout_callback(self, buffer, data)
else:
self.read_stderr_callback(self, buffer, data)
fh.close()


Expand Down Expand Up @@ -2096,7 +2130,11 @@ def _communicate(self, input, endtime, orig_timeout):
if not data:
selector.unregister(key.fileobj)
key.fileobj.close()
self._fileobj2output[key.fileobj].append(data)
else:
if key.fileobj == self.stdout:
self.read_stdout_callback(self, stdout, data)
else:
self.read_stderr_callback(self, stderr, data)

self.wait(timeout=self._remaining_time(endtime))

Expand Down
116 changes: 94 additions & 22 deletions Lib/test/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@

mswindows = (sys.platform == "win32")

NEWLINE = b'\r\n' if mswindows else b'\n'

#
# Depends on the following external programs: Python
#
Expand Down Expand Up @@ -322,26 +324,98 @@ def test_stdout_none(self):
# parent's stdout. This test checks that the message printed by the
# child goes to the parent stdout. The parent also checks that the
# child's stdout is None. See #11963.
code = ('import sys; from subprocess import Popen, PIPE;'
'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
' stdin=PIPE, stderr=PIPE);'
'p.wait(); assert p.stdout is None;')
p = subprocess.Popen([sys.executable, "-c", code],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
out, err = p.communicate()
self.assertEqual(p.returncode, 0, err)
self.assertEqual(out.rstrip(), b'test_stdout_none')
code = ('import sys\n'
'from subprocess import Popen, PIPE\n'
'with Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],\n'
' stdin=PIPE, stderr=PIPE) as p:\n'
' p.wait()\n'
' assert p.stdout is None\n')
with subprocess.Popen([sys.executable, "-c", code],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as p:
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
out, err = p.communicate()
self.assertEqual(p.returncode, 0, err)
self.assertEqual(out, b'test_stdout_none' + NEWLINE)
self.assertEqual(err, b'')

def test_stdout_tee(self):
# .stdout is PIPE when not redirected, and the child's stdout will
# be inherited from the parent. In order to test this we run a
# subprocess in a subprocess:
# this_test
# \-- subprocess created by this test (parent)
# \-- subprocess created by the parent subprocess (child)
# The parent doesn't specify stdout, so the child will use the
# parent's stdout. This test checks that the message printed by the
# child goes to the parent stdout. The parent also checks that the
# child's stdout is cloned. See #47222.
code = ('import sys\n'
'from subprocess import Popen, PIPE\n'
'with Popen([sys.executable, "-c", "print(\'test_stdout_teed\')"],\n'
' stdout=PIPE, stderr=PIPE, read_stdout_callback=Popen.tee_pipe_to(sys.stdout.buffer)) as p:\n'
' out, err = p.communicate()\n'
' assert p.returncode == 0\n'
' assert out.rstrip() == b\'test_stdout_teed\'\n'
' assert err == b\'\'\n'
)
with subprocess.Popen([sys.executable, "-c", code],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as p:
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
out, err = p.communicate()
self.assertEqual(p.returncode, 0, err.decode())
self.assertEqual(out, b'test_stdout_teed' + NEWLINE)
self.assertEqual(err, b'')

def test_stderr_none(self):
# .stderr is None when not redirected
p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stdin.close)
p.wait()
self.assertEqual(p.stderr, None)
with subprocess.Popen([sys.executable, "-c", 'print("banana")'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE) as p:
self.addCleanup(p.stdout.close)
self.addCleanup(p.stdin.close)
p.wait()
self.assertEqual(p.stderr, None)

def test_invalid_read_callback(self):
with self.assertRaises(ValueError) as cm:
with subprocess.Popen([sys.executable, '-c',
'import sys\n'
'print(\'foo\')\n'
'print(\'bar\', file=sys.stderr)\n'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
read_stdout_callback=True) as p:
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
out, err = p.communicate()
self.assertEqual(p.returncode, 0, err.decode())
e = cm.exception
self.assertEqual(e.args, ('read_stdout_callback not a callable', ))

def test_custom_read_callback(self):
def my_read_stdout_callback(self, buffer, data):
buffer.append(b'<' + data + b'>')
def my_read_stderr_callback(self, buffer, data):
buffer.append(b'[' + data + b']')

with subprocess.Popen([sys.executable, '-c',
'import sys\n'
'print(\'foo\')\n'
'print(\'bar\', file=sys.stderr)\n'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
read_stdout_callback=my_read_stdout_callback,
read_stderr_callback=my_read_stderr_callback) as p:
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
out, err = p.communicate()
self.assertEqual(p.returncode, 0, err.decode())
self.assertEqual(out, b'<foo' + NEWLINE + b'>')
self.assertEqual(err, b'[bar' + NEWLINE + b']')

def _assert_python(self, pre_args, **kwargs):
# We include sys.exit() to prevent the test runner from hanging
Expand Down Expand Up @@ -679,7 +753,7 @@ def test_stdout_filedes_of_stdout(self):
self.addCleanup(p.stderr.close)
out, err = p.communicate()
self.assertEqual(p.returncode, 0, err)
self.assertEqual(out.rstrip(), b'test with stdout=1')
self.assertEqual(out, b'test with stdout=1')

def test_stdout_devnull(self):
p = subprocess.Popen([sys.executable, "-c",
Expand Down Expand Up @@ -2678,8 +2752,7 @@ def test_undecodable_env(self):
stdout = subprocess.check_output(
[sys.executable, "-c", script],
env=env)
stdout = stdout.rstrip(b'\n\r')
self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
self.assertEqual(stdout.decode('ascii'), ascii(decoded_value) + NEWLINE.decode())

# test bytes
key = key.encode("ascii", "surrogateescape")
Expand All @@ -2689,8 +2762,7 @@ def test_undecodable_env(self):
stdout = subprocess.check_output(
[sys.executable, "-c", script],
env=env)
stdout = stdout.rstrip(b'\n\r')
self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
self.assertEqual(stdout.decode('ascii'), ascii(encoded_value) + NEWLINE.decode())

def test_bytes_program(self):
abs_program = os.fsencode(ZERO_RETURN_CMD[0])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
In the :mod:`subprocess` :class:`Popen`, factor the read callback handler for the ``PIPE`` file descriptors into its own function. Also add class variables pointing to the *stdout* and *stderr* read handlers. Lastly, add constructor variable ``read_stdout_callback`` and ``read_stderr_callback`` to allow overriding the handler with a user-supplied function.

Also add the method ``tee_pipe_to(handle)`` to this class, which indicates that output on the pipes should be copied to the parent's buffers, as well as being cloned onto the named *handle*.