如何加快与子进程的通信

问题描述

我正在使用带有 threading 线程的 Python 2 subprocess 来获取标准输入,并使用二进制文件 ABC 并将修改后的数据写入标准输出.

I am using Python 2 subprocess with threading threads to take standard input, process it with binaries A, B, and C and write modified data to standard output.

这个脚本(我们称之为:A_to_C.py)非常慢,我想学习如何修复它.

This script (let's call it: A_to_C.py) is very slow and I'd like to learn how to fix it.

大致流程如下:

A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))

B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))

C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin)) 

produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()

produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()

A_process.wait()
B_process.wait()
C_process.wait()

这个想法是标准输入进入A_to_C.py:

The idea is that standard input goes into A_to_C.py:

  1. A 二进制文件处理一大块标准输入,并使用函数 produceA 创建 A 输出.
  2. B 二进制文件处理 A 的标准输出块并通过 produceB 函数创建 B 输出代码>.
  3. C 二进制文件通过 produceC 函数处理 B 的标准输出块并写入 C- 输出到标准输出.
  1. The A binary processes a chunk of standard input and creates A-output with the function produceA.
  2. The B binary processes a chunk of A's standard output and creates B-output via the function produceB.
  3. The C binary processes a chunk of B's standard output via the function produceC and writes C-output to standard output.

我使用 cProfile 进行了分析,并且该脚本中的几乎所有时间似乎都花在了获取线程锁上.

I did profiling with cProfile and nearly all of the time in this script appears to be spent in acquiring thread locks.

例如,在测试 417s 作业中,416s(>99% 的总运行时间)用于获取线程锁:

For instance, in a test 417s job, 416s (>99% of the total runtime) is spent on acquiring thread locks:

$ python                                                                                                                                                                                                                                         
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)                                                                                                                                                                                                                                              
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2                                                                                                                                                                                                                                              
Type "help", "copyright", "credits" or "license" for more information.                                                                                                                                                                                                                        
>>> import pstats                                                                                                                                                                                                                                                                             
>>> p = pstats.Stats('1.profile')                                                                                                                                                                                                                                                             
>>> p.sort_stats('cumulative').print_stats(10)                                                                                                                                                                                                                                                
Thu Jun 12 22:19:07 2014    1.profile                                                                                                                                                                                                                                                         

         1755 function calls (1752 primitive calls) in 417.203 CPU seconds                                                                                                                                                                                                                    

   Ordered by: cumulative time                                                                                                                                                                                                                                                                
   List reduced from 162 to 10 due to restriction <10>                                                                                                                                                                                                                                        

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)                                                                                                                                                                                                                       
        1    0.020    0.020  417.203  417.203 A_to_C.py:90(<module>)                                                                                                                                                                                  
        1    0.000    0.000  417.123  417.123 A_to_C.py:809(main)                                                                                                                                                                                     
        6    0.000    0.000  416.424   69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)                                                                                                                                                                         
       32  416.424   13.013  416.424   13.013 {method 'acquire' of 'thread.lock' objects}                                                                                                                                                                                                     
        3    0.000    0.000  416.422  138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)                                                                                                                                                                         
        3    0.000    0.000    0.498    0.166 A_to_C.py:473(which)                                                                                                                                                                                    
       37    0.000    0.000    0.498    0.013 A_to_C.py:475(is_exe)                                                                                                                                                                                   
        3    0.496    0.165    0.496    0.165 {posix.access}                                                                                                                                                                                                                                  
        6    0.000    0.000    0.194    0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)                                                                                                                                                           
        3    0.000    0.000    0.191    0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)

我的 threading.Thread 和/或 subprocess.Popen 安排有什么问题导致此问题?

What am I doing wrong with my threading.Thread and/or subprocess.Popen arrangement which is causing this issue?


解决方案

您对 subprocess.Popen() 的调用隐式指定了 bufsize 的默认值 0,这会强制无缓冲 I/O.尝试添加一个合理的缓冲区大小(4K、16K,甚至 1M),看看是否有什么不同.

Your calls to subprocess.Popen() implicitly specify the default value of bufsize, 0, which forces unbuffered I/O. Try adding a reasonable buffer size (4K, 16K, even 1M) and see if it makes any difference.

相关文章