如何在 python 子进程之间传递大型 numpy 数组而不保存到磁盘?
问题描述
有没有一种在不使用磁盘的情况下在两个 python 子进程之间传递大量数据的好方法?这是我希望完成的卡通示例:
Is there a good way to pass a large chunk of data between two python subprocesses without using the disk? Here's a cartoon example of what I'm hoping to accomplish:
import sys, subprocess, numpy
cmdString = """
import sys, numpy
done = False
while not done:
cmd = raw_input()
if cmd == 'done':
done = True
elif cmd == 'data':
##Fake data. In real life, get data from hardware.
data = numpy.zeros(1000000, dtype=numpy.uint8)
data.dump('data.pkl')
sys.stdout.write('data.pkl' + '\n')
sys.stdout.flush()"""
proc = subprocess.Popen( #python vs. pythonw on Windows?
[sys.executable, '-c %s'%cmdString],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for i in range(3):
proc.stdin.write('data
')
print proc.stdout.readline().rstrip()
a = numpy.load('data.pkl')
print a.shape
proc.stdin.write('done
')
这将创建一个子进程,该子进程生成一个 numpy 数组并将该数组保存到磁盘.然后父进程从磁盘加载阵列.有效!
This creates a subprocess which generates a numpy array and saves the array to disk. The parent process then loads the array from disk. It works!
问题是,我们的硬件生成数据的速度比磁盘读取/写入的速度快 10 倍.有没有办法将数据从一个 python 进程传输到另一个纯粹在内存中的进程,甚至可能不复制数据?我可以做类似通过引用传递的事情吗?
The problem is, our hardware can generate data 10x faster than the disk can read/write. Is there a way to transfer data from one python process to another purely in-memory, maybe even without making a copy of the data? Can I do something like passing-by-reference?
我第一次尝试纯粹在内存中传输数据非常糟糕:
My first attempt at transferring data purely in-memory is pretty lousy:
import sys, subprocess, numpy
cmdString = """
import sys, numpy
done = False
while not done:
cmd = raw_input()
if cmd == 'done':
done = True
elif cmd == 'data':
##Fake data. In real life, get data from hardware.
data = numpy.zeros(1000000, dtype=numpy.uint8)
##Note that this is NFG if there's a '10' in the array:
sys.stdout.write(data.tostring() + '\n')
sys.stdout.flush()"""
proc = subprocess.Popen( #python vs. pythonw on Windows?
[sys.executable, '-c %s'%cmdString],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for i in range(3):
proc.stdin.write('data
')
a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8)
print a.shape
proc.stdin.write('done
')
这非常慢(比保存到磁盘慢得多)并且非常非常脆弱.一定有更好的方法!
This is extremely slow (much slower than saving to disk) and very, very fragile. There's got to be a better way!
只要数据获取进程不阻塞父应用程序,我就不会与子进程"模块结婚.我短暂地尝试了多处理",但到目前为止没有成功.
I'm not married to the 'subprocess' module, as long as the data-taking process doesn't block the parent application. I briefly tried 'multiprocessing', but without success so far.
背景:我们有一个硬件可以在一系列 ctypes 缓冲区中生成高达 ~2 GB/s 的数据.处理这些缓冲区的 python 代码只是处理大量的信息.我想将此信息流与在主"程序中同时运行的其他几个硬件协调,而子进程不会相互阻塞.我目前的方法是在保存到磁盘之前在子进程中将数据煮沸一点,但最好将完整的数据传递给主"进程.
Background: We have a piece of hardware that generates up to ~2 GB/s of data in a series of ctypes buffers. The python code to handle these buffers has its hands full just dealing with the flood of information. I want to coordinate this flow of information with several other pieces of hardware running simultaneously in a 'master' program, without the subprocesses blocking each other. My current approach is to boil the data down a little bit in the subprocess before saving to disk, but it'd be nice to pass the full monty to the 'master' process.
解决方案
在搜索有关 Joe Kington 发布的代码的更多信息时,我发现了 numpy-sharedmem 包.从这个 numpy/multiprocessing 教程来看,它似乎共享相同的知识遗产(可能大致相同作者?——我不确定).
While googling around for more information about the code Joe Kington posted, I found the numpy-sharedmem package. Judging from this numpy/multiprocessing tutorial it seems to share the same intellectual heritage (maybe largely the same authors? -- I'm not sure).
使用 sharedmem 模块,您可以创建一个共享内存 numpy 数组(太棒了!),并将其与 多处理 像这样:
Using the sharedmem module, you can create a shared-memory numpy array (awesome!), and use it with multiprocessing like this:
import sharedmem as shm
import numpy as np
import multiprocessing as mp
def worker(q,arr):
done = False
while not done:
cmd = q.get()
if cmd == 'done':
done = True
elif cmd == 'data':
##Fake data. In real life, get data from hardware.
rnd=np.random.randint(100)
print('rnd={0}'.format(rnd))
arr[:]=rnd
q.task_done()
if __name__=='__main__':
N=10
arr=shm.zeros(N,dtype=np.uint8)
q=mp.JoinableQueue()
proc = mp.Process(target=worker, args=[q,arr])
proc.daemon=True
proc.start()
for i in range(3):
q.put('data')
# Wait for the computation to finish
q.join()
print arr.shape
print(arr)
q.put('done')
proc.join()
运行产量
rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]
相关文章