让我的 NumPy 数组跨进程共享

问题描述

我已经阅读了很多关于 SO 的关于共享数组的问题,对于简单的数组来说似乎很简单,但我一直试图让它适用于我拥有的数组.

I have read quite a few of the questions on SO about sharing arrays and it seems simple enough for simple arrays but I am stuck trying to get it working for the array I have.

import numpy as np
data=np.zeros(250,dtype='float32, (250000,2)float32')

我尝试通过尝试以某种方式使 mp.Array 接受 data 来将其转换为共享数组,我还尝试使用 ctypes 来创建数组:

I have tried converting this to a shared array by trying to somehow make mp.Array accept the data, I have also tried creating the array as using ctypes as such:

import multiprocessing as mp
data=mp.Array('c_float, (250000)c_float',250)

我设法让我的代码正常工作的唯一方法不是将数据传递给函数,而是传递一个编码字符串以进行解压缩/解码,但这最终会导致 n(字符串数)进程被调用,这似乎是多余的.我想要的实现是基于将二进制字符串列表分割成 x(进程数)并将这个块、data 和一个 index 传递给除了 data 是在本地修改的,因此关于如何使其共享的问题,任何使用自定义(嵌套)numpy 数组的示例都已经是帮助很大.

The only way I have managed to get my code working is not passing data to the function but passing an encoded string to be uncompressed/decoded, this would however end up in n (number of strings) processes being called which seems redundant. My desired implementation is based on slicing the list of binary strings into x (number of processes) and passing this chunk, data and an index to the processes which works except that data is modified locally, hence the question on how to make it shared, any example working with a custom (nested) numpy array would already be a great help.

PS:这个问题是 Python 多处理


解决方案

请注意,您可以从复杂 dtype 的数组开始:

Note that you can start out with an array of complex dtype:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

并将其视为同质 dtype 的数组:

and view it as an array of homogenous dtype:

In [5]: data2 = data.view('float32')

然后,将其转换回复杂的 dtype:

and later, convert it back to complex dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32')

更改数据类型是一个非常快速的操作;它不会影响基础数据,只会影响 NumPy 解释它的方式.所以改变 dtype 几乎没有成本.

Changing the dtype is a very quick operation; it does not affect the underlying data, only the way NumPy interprets it. So changing the dtype is virtually costless.

因此,您所阅读的有关具有简单(同质)dtype 的数组的内容可以通过上述技巧轻松应用于您的复杂 dtype.

So what you've read about arrays with simple (homogenous) dtypes can be readily applied to your complex dtype with the trick above.

下面的代码借鉴了 J.F.塞巴斯蒂安的答案,在这里.

import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64


def decode(arg):
    chunk, counter = arg
    print len(chunk), counter
    for x in chunk:
        peak_counter = 0
        data_buff = base64.b64decode(x)
        buff_size = len(data_buff) / 4
        unpack_format = ">%dL" % buff_size
        index = 0
        for y in struct.unpack(unpack_format, data_buff):
            buff1 = struct.pack("I", y)
            buff2 = struct.unpack("f", buff1)[0]
            with shared_arr.get_lock():
                data = tonumpyarray(shared_arr).view(
                    [('f0', '<f4'), ('f1', '<f4', (250000, 2))])
                if (index % 2 == 0):
                    data[counter][1][peak_counter][0] = float(buff2)
                else:
                    data[counter][1][peak_counter][1] = float(buff2)
                    peak_counter += 1
            index += 1
        counter += 1


def pool_init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_  # must be inherited, not passed as an argument


def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())


def numpy_array(shared_arr, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors = mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size = int(len(peaks) / processors)
        map_parameters = []
        for i in range(processors):
            counter = i * chunk_size
            # WARNING: I removed -1 from (i + 1)*chunk_size, since the right
            # index is non-inclusive. 
            chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode, map_parameters)

if __name__ == '__main__':
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
    peaks = ...
    numpy_array(shared_arr, peaks)

如果你能保证执行任务的各个进程

If you can guarantee that the various processes which execute the assignments

if (index % 2 == 0):
    data[counter][1][peak_counter][0] = float(buff2)
else:
    data[counter][1][peak_counter][1] = float(buff2)

永远不要竞争更改相同位置的数据,那么我相信您实际上可以放弃使用锁

never compete to alter the data in the same locations, then I believe you can actually forgo using the lock

with shared_arr.get_lock():

但我对您的代码的理解不够深入,无法确定,所以为了安全起见,我将锁包含在内.

but I don't grok your code well enough to know for sure, so to be on the safe side, I included the lock.

相关文章