Multiple pipes in subprocess

I am trying to use Sailfish, which takes multiple fastq files as arguments, in a ruffus pipeline. I execute Sailfish using the subprocess module in python, but <() in the subprocess call does not work even when I set shell=True.

This is the command I want to execute using python:

sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file]

or (preferably):

sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file]

A generalization:

someprogram <(someprocess) <(someprocess)

How would I go about doing this in python? Is subprocess the right approach?

Answers:

Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.

Method 1

To emulate the bash process substitution:

#!/usr/bin/env python
from subprocess import check_call

check_call('someprogram <(someprocess) <(anotherprocess)',
           shell=True, executable='/bin/bash')

In Python, you could use named pipes:

#!/usr/bin/env python
from subprocess import Popen

with named_pipes(n=2) as paths:
    someprogram = Popen(['someprogram'] + paths)
    processes = []
    for path, command in zip(paths, ['someprocess', 'anotherprocess']):
        with open(path, 'wb', 0) as pipe:
            processes.append(Popen(command, stdout=pipe, close_fds=True))
    for p in [someprogram] + processes:
        p.wait()

where named_pipes(n) is:

import os
import shutil
import tempfile
from contextlib import contextmanager

@contextmanager
def named_pipes(n=1):
    dirname = tempfile.mkdtemp()
    try:
        paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
        for path in paths:
            os.mkfifo(path)
        yield paths
    finally:
        shutil.rmtree(dirname)

Another and more preferable way (no need to create a named entry on disk) to implement the bash process substitution is to use /dev/fd/N filenames (if they are available) as suggested by @Dunes. On FreeBSD, fdescfs(5) (/dev/fd/#) creates entries for all file descriptors opened by the process. To test availability, run:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available

If it fails; try to symlink /dev/fd to proc(5) as it is done on some Linuxes:

$ ln -s /proc/self/fd /dev/fd

Here’s /dev/fd-based implementation of someprogram <(someprocess) <(anotherprocess) bash command:

#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE

def kill(process):
    if process.poll() is None: # still running
        process.kill()

with ExitStack() as stack: # for proper cleanup
    processes = []
    for command in [['someprocess'], ['anotherprocess']]:  # start child processes
        processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
        stack.callback(kill, processes[-1]) # kill on someprogram exit

    fds = [p.stdout.fileno() for p in processes]
    someprogram = stack.enter_context(
        Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
    for p in processes: # close pipes in the parent
        p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
   raise CalledProcessError(someprogram.returncode, someprogram.args)

Note: on my Ubuntu machine, the subprocess code works only in Python 3.4+, despite pass_fds being available since Python 3.2.

Method 2

Whilst J.F. Sebastian has provided an answer using named pipes it is possible to do this with anonymous pipes.

import shlex
from subprocess import Popen, PIPE

inputcmd0 = "zcat hello.gz" # gzipped file containing "hello"
inputcmd1 = "zcat world.gz" # gzipped file containing "world"

def get_filename(file_):
    return "/dev/fd/{}".format(file_.fileno())

def get_stdout_fds(*processes):
    return tuple(p.stdout.fileno() for p in processes)

# setup producer processes
inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE)
inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE)

# setup consumer process
# pass input processes pipes by "filename" eg. /dev/fd/5
cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout), 
    file1=get_filename(inputproc1.stdout))
print("command is:", cmd)
# pass_fds argument tells Popen to let the child process inherit the pipe's fds
someprogram = Popen(shlex.split(cmd), stdout=PIPE, 
    pass_fds=get_stdout_fds(inputproc0, inputproc1))

output, error = someprogram.communicate()

for p in [inputproc0, inputproc1, someprogram]:
    p.wait()

assert output == b"hellonworldn"


All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x