如何从 subprocess.communicate() 在 python 中捕获流输出
问题描述
目前,我有这样的事情:
Currently, I have something like this:
self.process = subprocess.Popen(self.cmd, stdout=subprocess.PIPE)
out, err = self.process.communicate()
我正在运行的命令流式传输输出,我需要在继续之前阻止该进程.
The command I'm running streams the output, and I need the process to block before continuing.
如何才能捕获流输出并通过标准输出打印流输出?当我设置 stdout=subprocess.PIPE
时,我可以捕获输出,但它不会打印输出.如果我省略 stdout=subprocess.PIPE
,它会打印输出,但 communicate()
将返回 None
.
How do I make it so that I can capture the streaming output AND have the streaming output printing through stdout? When I set stdout=subprocess.PIPE
, I can capture the output, but it won't print the output. If I leave out stdout=subprocess.PIPE
, it prints the output, but communicate()
will return None
.
是否有一种解决方案可以满足我的要求,同时提供阻塞,直到进程终止/完成,并避免提到的缓冲区问题和管道死锁问题 这里?
Is there a solution that would do what I'm asking for WHILE providing blocking until the process is terminated/completed AND avoid buffer issues and pipe deadlock issues mentioned here?
谢谢!
解决方案
我能想到几个解决方案.
I can think of a few solutions.
#1:你可以直接进入源码获取communicate
的代码,复制并粘贴它,添加打印每一行的代码以及缓冲内容.(如果您自己的 stdout
可能由于父级死锁而阻塞,则可以使用 threading.Queue
或其他东西.)这显然有点hacky,但它很简单,而且很安全.
#1: You can just go into the source to grab the code for communicate
, copy and paste it, adding in code that prints each line as it comes in as well as buffering things up. (If its possible for your own stdout
to block because of, say, a deadlocked parent, you can use a threading.Queue
or something instead.) This is obviously a bit hacky, but it's pretty easy, and will be safe.
但实际上,communicate
很复杂,因为它需要完全通用,并处理您不需要的情况.您需要的只是核心技巧:在问题上抛出线程.您只需要一个不会在 read
调用之间造成任何缓慢或阻塞的专用读取器线程.
But really, communicate
is complicated because it needs to be fully general, and handle cases you don't. All you need here is the central trick: throw threads at the problem. A dedicated reader thread that doesn't do anything slow or blocking between read
calls is all you need.
类似这样的:
self.process = subprocess.Popen(self.cmd, stdout=subprocess.PIPE)
lines = []
def reader():
for line in self.process.stdout:
lines.append(line)
sys.stdout.write(line)
t = threading.Thread(target=reader)
t.start()
self.process.wait()
t.join()
您可能需要在 reader
线程中进行一些错误处理.而且我不能 100% 确定您可以在这里安全地使用 readline
.但这要么有效,要么接近.
You may need some error handling in the reader
thread. And I'm not 100% sure you can safely use readline
here. But this will either work, or be close.
#2:或者你可以创建一个包装类,它接收一个文件对象并在每次任何人 read
时发送到 stdout
/stderr
从中.然后手动创建管道,并传入包裹的管道,而不是使用自动 PIPE
.这与 #1 有完全相同的问题(意味着没有问题,或者您需要使用 Queue
或如果 sys.stdout.write
可以阻止).
#2: Or you can create a wrapper class that takes a file object and tees to stdout
/stderr
every time anyone read
s from it. Then create the pipes manually, and pass in wrapped pipes, instead of using the automagic PIPE
. This has the exact same issues as #1 (meaning either no issues, or you need to use a Queue
or something if sys.stdout.write
can block).
类似这样的:
class TeeReader(object):
def __init__(self, input_file, tee_file):
self.input_file = input_file
self.tee_file = tee_file
def read(self, size=-1):
ret = self.input_file.read(size)
if ret:
self.tee_file.write(ret)
return ret
换句话说,它包装了一个文件对象(或类似文件的东西),并像一个文件对象一样工作.(当您使用 PIPE
时,process.stdout
在 Unix 上是一个真实的文件对象,但可能只是在 Windows 上的行为.)您需要的任何其他方法委托给 input_file
可能可以直接委托,无需任何额外的包装.试试这个,看看 communicate
获取 AttributeException
的哪些方法并明确编码那些方法,或者执行通常的 __getattr__
技巧来委派所有内容.PS,如果你担心这个文件对象"的意思是磁盘存储,请阅读 一切都是文件 在维基百科.
In other words, it wraps a file object (or something that acts like one), and acts like a file object. (When you use PIPE
, process.stdout
is a real file object on Unix, but may just be something that acts like on on Windows.) Any other methods you need to delegate to input_file
can probably be delegated directly, without any extra wrapping. Either try this and see what methods communicate
gets AttributeException
s looking for and code those those explicitly, or do the usual __getattr__
trick to delegate everything. PS, if you're worried about this "file object" idea meaning disk storage, read Everything is a file at Wikipedia.
#3:最后,您可以获取 PyPI 上的异步子进程"模块之一,或者包含在 twisted
或其他异步框架中并使用它.(这使得 可能 避免死锁问题,但这不是保证 - 您仍然必须确保正确地服务管道.)
#3: Finally, you can grab one of the "async subprocess" modules on PyPI or included in twisted
or other async frameworks and use that. (This makes it possible to avoid the deadlock problems, but it's not guaranteed—you still have to make sure to services the pipes properly.)
相关文章