块 - 将输入发送到 python 子进程管道

2022-01-18 00:00:00 python subprocess ipc pipe blocking

问题描述

我正在使用 python 测试子流程管道.我知道我可以直接在 python 中执行下面的程序,但这不是重点.我只是想测试一下管道,所以我知道如何使用它.

I'm testing subprocesses pipelines with python. I'm aware that I can do what the programs below do in python directly, but that's not the point. I just want to test the pipeline so I know how to use it.

我的系统是 Linux Ubuntu 9.04,默认 python 2.6.

My system is Linux Ubuntu 9.04 with default python 2.6.

我从这个文档示例开始.

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

这行得通,但由于 p1stdin 没有被重定向,我必须在终端中输入内容来输入管道.当我键入 ^D 关闭标准输入时,我得到了我想要的输出.

That works, but since p1's stdin is not being redirected, I have to type stuff in the terminal to feed the pipe. When I type ^D closing stdin, I get the output I want.

但是,我想使用 python 字符串变量将数据发送到管道.首先我尝试在标准输入上写:

However, I want to send data to the pipe using a python string variable. First I tried writing on stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test
')
output = p2.communicate()[0] # blocks forever here

没用.我尝试在最后一行使用 p2.stdout.read() 代替,但它也阻塞了.我添加了 p1.stdin.flush()p1.stdin.close() 但它也不起作用.我然后我开始交流:

Didn't work. I tried using p2.stdout.read() instead on last line, but it also blocks. I added p1.stdin.flush() and p1.stdin.close() but it didn't work either. I Then I moved to communicate:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test
') # blocks forever here
output = p2.communicate()[0] 

所以还是不是这样.

我注意到运行单个进程(如上面的 p1,删除 p2)效果很好.并且将文件句柄传递给 p1 (stdin=open(...)) 也可以.所以问题是:

I noticed that running a single process (like p1 above, removing p2) works perfectly. And passing a file handle to p1 (stdin=open(...)) also works. So the problem is:

是否可以在没有阻塞的情况下将数据传递到 python 中的 2 个或多个子进程的管道?为什么不呢?

Is it possible to pass data to a pipeline of 2 or more subprocesses in python, without blocking? Why not?

我知道我可以运行 shell 并在 shell 中运行管道,但这不是我想要的.

I'm aware I could run a shell and run the pipeline in the shell, but that's not what I want.

更新 1:按照下面 Aaron Digulla 的提示,我现在正在尝试使用线程来使其工作.

UPDATE 1: Following Aaron Digulla's hint below I'm now trying to use threads to make it work.

首先我尝试在线程上运行 p1.communicate.

First I've tried running p1.communicate on a thread.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data
',))
t.start()
output = p2.communicate()[0] # blocks forever here

好的,没用.尝试了其他组合,例如将其更改为 .write()p2.read().没有什么.现在让我们尝试相反的方法:

Okay, didn't work. Tried other combinations like changing it to .write() and also p2.read(). Nothing. Now let's try the opposite approach:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data
') # blocks here.
t.join()

代码最终在某处阻塞.在衍生线程中,或在主线程中,或两者兼而有之.所以它没有用.如果你知道如何让它工作,如果你能提供工作代码会更容易.我在这里试试.

code ends up blocking somewhere. Either in the spawned thread, or in the main thread, or both. So it didn't work. If you know how to make it work it would make easier if you can provide working code. I'm trying here.

更新 2

Paul Du Bois 在下面回答了一些信息,所以我做了更多的测试.我已经阅读了整个 subprocess.py 模块并了解了它的工作原理.所以我试着把它应用到代码中.

Paul Du Bois answered below with some information, so I did more tests. I've read entire subprocess.py module and got how it works. So I tried applying exactly that to code.

我在 linux 上,但由于我使用线程进行测试,我的第一个方法是复制在 subprocess.pycommunicate() 方法,但用于两个进程而不是一个.以下是我尝试过的全部内容:

I'm on linux, but since I was testing with threads, my first approach was to replicate the exact windows threading code seen on subprocess.py's communicate() method, but for two processes instead of one. Here's the entire listing of what I tried:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world
') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

嗯.它没有用.即使在调用 p1.stdin.close() 之后,p2.stdout.read() 仍然会阻塞.

Well. It didn't work. Even after p1.stdin.close() was called, p2.stdout.read() still blocks.

然后我在 subprocess.py 上尝试了 posix 代码:

Then I tried the posix code on subprocess.py:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!
'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

select.select() 上也会阻塞.通过传播prints,我发现了这一点:

Also blocks on select.select(). By spreading prints around, I found out this:

  • 正在阅读.代码在执行过程中多次读取.
  • 写作也很有效.数据写入p1.stdin.
  • numwrites结束时,调用p1.stdin.close().
  • select() 开始阻塞时,只有 to_read 有东西,p2.stdout.to_write 已经为空.
  • os.read() 调用总是返回一些东西,所以 p2.stdout.close() 永远不会被调用.
  • Reading is working. Code reads many times during execution.
  • Writing is also working. Data is written to p1.stdin.
  • At the end of numwrites, p1.stdin.close() is called.
  • When select() starts blocking, only to_read has something, p2.stdout. to_write is already empty.
  • os.read() call always returns something, so p2.stdout.close() is never called.

两个测试的结论:关闭管道上第一个进程的 stdin(示例中为 grep)不会使其转储它的缓冲输出到下一个并死掉.

Conclusion from both tests: Closing the stdin of the first process on the pipeline (grep in the example) is not making it dump its buffered output to the next and die.

没有办法让它工作?

PS:我不想使用临时文件,我已经用文件进行了测试,我知道它可以工作.而且我不想使用 windows.

PS: I don't want to use a temporary file, I've already tested with files and I know it works. And I don't want to use windows.


解决方案

我知道怎么做.

这与线程无关,也与 select() 无关.

It is not about threads, and not about select().

当我运行第一个进程 (grep) 时,它会创建两个低级文件描述符,每个管道一个.让我们调用这些 ab.

When I run the first process (grep), it creates two low-level file descriptors, one for each pipe. Lets call those a and b.

当我运行第二个进程时,b 被传递给 cut sdtin.但是 Popen 有一个脑死的默认值 - close_fds=False.

When I run the second process, b gets passed to cut sdtin. But there is a brain-dead default on Popen - close_fds=False.

这样做的效果是cut也继承了a.所以即使我关闭了a grep 也不会死掉,因为stdin 在cut 的进程(cut 忽略它).

The effect of that is that cut also inherits a. So grep can't die even if I close a, because stdin is still open on cut's process (cut ignores it).

下面的代码现在可以完美运行了.

The following code now runs perfectly.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World
')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl
"

close_fds=True 应该是 unix 系统上的默认值.在 Windows 上,它会关闭 所有 fds,因此它会阻止管道.

close_fds=True SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.

PS:对于阅读此答案时遇到类似问题的人:正如pooryorick 在评论中所说,如果写入 p1.stdin 的数据大于缓冲区,这也可能会阻塞.在这种情况下,您应该将数据分成更小的部分,并使用 select.select() 来了解何时读取/写入.问题中的代码应该提示如何实现它.

PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select() to know when to read/write. The code in the question should give a hint on how to implement that.

在pooryorick 的更多帮助下找到了另一种解决方案 - 而不是使用 close_fds=True 并关闭 ALL fds,可以关闭 fds,在执行第二个时,它将起作用.关闭必须在子进程中完成,因此 Popen 中的 preexec_fn 函数非常方便地做到这一点.在执行 p2 时,您可以执行以下操作:

Found another solution, with more help from pooryorick - instead of using close_fds=True and close ALL fds, one could close the fds that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn function from Popen comes very handy to do just that. On executing p2 you can do:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

相关文章