Subprocess.Popen: cloning stdout and stderr both to terminal and variables

Is it possible to modify code below to have printout from ‘stdout ‘and ‘stderr’:

  • printed on the terminal (in real time),
  • and finally stored in outs and errs variables?

The code:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import subprocess

def run_cmd(command, cwd=None):
    p = subprocess.Popen(command, cwd=cwd, shell=False,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    outs, errs = p.communicate()
    rc = p.returncode
    outs = outs.decode('utf-8')
    errs = errs.decode('utf-8')

    return (rc, (outs, errs))

Thanks to @unutbu, special thanks for @j-f-sebastian, final function:

#!/usr/bin/python3
# -*- coding: utf-8 -*-


import sys
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread


def read_output(pipe, funcs):
    for line in iter(pipe.readline, b''):
        for func in funcs:
            func(line.decode('utf-8'))
    pipe.close()


def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)


def run_cmd(command, cwd=None, passthrough=True):
    outs, errs = None, None

    proc = Popen(
        command,
        cwd=cwd,
        shell=False,
        close_fds=True,
        stdout=PIPE,
        stderr=PIPE,
        bufsize=1
        )

    if passthrough:

        outs, errs = [], []

        q = Queue()

        stdout_thread = Thread(
            target=read_output, args=(proc.stdout, [q.put, outs.append])
            )

        stderr_thread = Thread(
            target=read_output, args=(proc.stderr, [q.put, errs.append])
            )

        writer_thread = Thread(
            target=write_output, args=(q.get,)
            )

        for t in (stdout_thread, stderr_thread, writer_thread):
            t.daemon = True
            t.start()

        proc.wait()

        for t in (stdout_thread, stderr_thread):
            t.join()

        q.put(None)

        outs = ' '.join(outs)
        errs = ' '.join(errs)

    else:

        outs, errs = proc.communicate()
        outs = '' if outs == None else outs.decode('utf-8')
        errs = '' if errs == None else errs.decode('utf-8')

    rc = proc.returncode

    return (rc, (outs, errs))

Answers:

Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.

Method 1

To capture and display at the same time both stdout and stderr from a child process line by line in a single thread, you could use asynchronous I/O:

#!/usr/bin/env python3
import asyncio
import os
import sys
from asyncio.subprocess import PIPE

@asyncio.coroutine
def read_stream_and_display(stream, display):
    """Read from stream line by line until EOF, display, and capture the lines.

    """
    output = []
    while True:
        line = yield from stream.readline()
        if not line:
            break
        output.append(line)
        display(line) # assume it doesn't block
    return b''.join(output)

@asyncio.coroutine
def read_and_display(*cmd):
    """Capture cmd's stdout, stderr while displaying them as they arrive
    (line by line).

    """
    # start process
    process = yield from asyncio.create_subprocess_exec(*cmd,
            stdout=PIPE, stderr=PIPE)

    # read child's stdout/stderr concurrently (capture and display)
    try:
        stdout, stderr = yield from asyncio.gather(
            read_stream_and_display(process.stdout, sys.stdout.buffer.write),
            read_stream_and_display(process.stderr, sys.stderr.buffer.write))
    except Exception:
        process.kill()
        raise
    finally:
        # wait for the process to exit
        rc = yield from process.wait()
    return rc, stdout, stderr

# run the event loop
if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
rc, *output = loop.run_until_complete(read_and_display(*cmd))
loop.close()

Method 2

You could spawn threads to read the stdout and stderr pipes, write to a common queue, and append to lists. Then use a third thread to print items from the queue.

import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE


def read_output(pipe, funcs):
    for line in iter(pipe.readline, ''):
        for func in funcs:
            func(line)
            # time.sleep(1)
    pipe.close()

def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
    target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
    target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
    t.daemon = True
    t.start()
process.wait()
for t in (tout, terr):
    t.join()
q.put(None)
print(out)
print(err)

The reason for using the third thread — instead of letting the first two threads both print directly to the terminal — is to prevent both print statements from occurring concurrently, which can result in sometimes garbled text.


The above calls random_print.py, which prints to stdout and stderr at random:

import sys
import time
import random

for i in range(50):
    f = random.choice([sys.stdout,sys.stderr])
    f.write(str(i)+'n')
    f.flush()
    time.sleep(0.1)

This solution borrows code and ideas from J. F. Sebastian, here.


Here is an alternative solution for Unix-like systems, using select.select:

import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE

def make_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''add the O_NONBLOCK flag to a file descriptor'''
    fcntl.fcntl(
        fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

def read_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''read some data from a file descriptor, ignoring EAGAIN errors'''
    # time.sleep(1)
    try:
        return fd.read()
    except IOError, e:
        if e.errno != errno.EAGAIN:
            raise e
        else:
            return ''

def write_output(fds, outmap):
    for fd in fds:
        line = read_async(fd)
        sys.stdout.write(line)
        outmap[fd.fileno()].append(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)

make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
    rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
    write_output(rlist, outmap)
    if process.poll() is not None:
        write_output([process.stdout, process.stderr], outmap)
        break

fileno = {'stdout': process.stdout.fileno(),
          'stderr': process.stderr.fileno()}

print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])

This solution uses code and ideas from Adam Rosenfield’s post, here.


All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x