如何在 python 子进程之间传递大型 numpy 数组而不保存到磁盘?

问题描述

有没有一种在不使用磁盘的情况下在两个 python 子进程之间传递大量数据的好方法?这是我希望完成的卡通示例:

Is there a good way to pass a large chunk of data between two python subprocesses without using the disk? Here's a cartoon example of what I'm hoping to accomplish:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        data.dump('data.pkl')
        sys.stdout.write('data.pkl' + '\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data
')
    print proc.stdout.readline().rstrip()
    a = numpy.load('data.pkl')
    print a.shape

proc.stdin.write('done
')

这将创建一个子进程,该子进程生成一个 numpy 数组并将该数组保存到磁盘.然后父进程从磁盘加载阵列.有效!

This creates a subprocess which generates a numpy array and saves the array to disk. The parent process then loads the array from disk. It works!

问题是,我们的硬件生成数据的速度比磁盘读取/写入的速度快 10 倍.有没有办法将数据从一个 python 进程传输到另一个纯粹在内存中的进程,甚至可能不复制数据?我可以做类似通过引用传递的事情吗?

The problem is, our hardware can generate data 10x faster than the disk can read/write. Is there a way to transfer data from one python process to another purely in-memory, maybe even without making a copy of the data? Can I do something like passing-by-reference?

我第一次尝试纯粹在内存中传输数据非常糟糕:

My first attempt at transferring data purely in-memory is pretty lousy:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        ##Note that this is NFG if there's a '10' in the array:
        sys.stdout.write(data.tostring() + '\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data
')
    a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8)
    print a.shape

proc.stdin.write('done
')

这非常慢(比保存到磁盘慢得多)并且非常非常脆弱.一定有更好的方法!

This is extremely slow (much slower than saving to disk) and very, very fragile. There's got to be a better way!

只要数据获取进程不阻塞父应用程序,我就不会与子进程"模块结婚.我短暂地尝试了多处理",但到目前为止没有成功.

I'm not married to the 'subprocess' module, as long as the data-taking process doesn't block the parent application. I briefly tried 'multiprocessing', but without success so far.

背景:我们有一个硬件可以在一系列 ctypes 缓冲区中生成高达 ~2 GB/s 的数据.处理这些缓冲区的 python 代码只是处理大量的信息.我想将此信息流与在主"程序中同时运行的其他几个硬件协调,而子进程不会相互阻塞.我目前的方法是在保存到磁盘之前在子进程中将数据煮沸一点,但最好将完整的数据传递给主"进程.

Background: We have a piece of hardware that generates up to ~2 GB/s of data in a series of ctypes buffers. The python code to handle these buffers has its hands full just dealing with the flood of information. I want to coordinate this flow of information with several other pieces of hardware running simultaneously in a 'master' program, without the subprocesses blocking each other. My current approach is to boil the data down a little bit in the subprocess before saving to disk, but it'd be nice to pass the full monty to the 'master' process.


解决方案

在搜索有关 Joe Kington 发布的代码的更多信息时,我发现了 numpy-sharedmem 包.从这个 numpy/multiprocessing 教程来看,它似乎共享相同的知识遗产(可能大致相同作者?——我不确定).

While googling around for more information about the code Joe Kington posted, I found the numpy-sharedmem package. Judging from this numpy/multiprocessing tutorial it seems to share the same intellectual heritage (maybe largely the same authors? -- I'm not sure).

使用 sharedmem 模块,您可以创建一个共享内存 numpy 数组(太棒了!),并将其与 多处理 像这样:

Using the sharedmem module, you can create a shared-memory numpy array (awesome!), and use it with multiprocessing like this:

import sharedmem as shm
import numpy as np
import multiprocessing as mp

def worker(q,arr):
    done = False
    while not done:
        cmd = q.get()
        if cmd == 'done':
            done = True
        elif cmd == 'data':
            ##Fake data. In real life, get data from hardware.
            rnd=np.random.randint(100)
            print('rnd={0}'.format(rnd))
            arr[:]=rnd
        q.task_done()

if __name__=='__main__':
    N=10
    arr=shm.zeros(N,dtype=np.uint8)
    q=mp.JoinableQueue()    
    proc = mp.Process(target=worker, args=[q,arr])
    proc.daemon=True
    proc.start()

    for i in range(3):
        q.put('data')
        # Wait for the computation to finish
        q.join()   
        print arr.shape
        print(arr)
    q.put('done')
    proc.join()

运行产量

rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]

相关文章