numpy 与多处理和 mmap

2022-01-12 00:00:00 python numpy multiprocessing mmap

问题描述

我正在使用 Python 的 multiprocessing 模块来并行处理大型 numpy 数组.数组在主进程中使用 numpy.load(mmap_mode='r') 进行内存映射.之后,multiprocessing.Pool() 分叉进程(我猜).

I am using Python's multiprocessing module to process large numpy arrays in parallel. The arrays are memory-mapped using numpy.load(mmap_mode='r') in the master process. After that, multiprocessing.Pool() forks the process (I presume).

一切似乎都运行良好,除了我得到如下行:

Everything seems to work fine, except I am getting lines like:

AttributeError("'NoneType' object has no attribute 'tell'",)
  in `<bound method memmap.__del__ of
       memmap([ 0.57735026,  0.57735026,  0.57735026,  0.        ,  0.        ,        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,        0.        ,  0.        ], dtype=float32)>`
     ignored

在单元测试日志中.尽管如此,测试还是通过了.

in the unittest logs. The tests pass fine, nevertheless.

知道那里发生了什么吗?

Any idea what's going on there?

使用 Python 2.7.2、OS X、NumPy 1.6.1.

Using Python 2.7.2, OS X, NumPy 1.6.1.

更新:

经过一些调试,我找到了一个代码路径的原因,该路径使用这个内存映射的 numpy 数组的(一小部分)作为 Pool.imap 调用的输入.

After some debugging, I hunted down the cause to a code path that was using a (small slice of) this memory-mapped numpy array as input to a Pool.imap call.

显然,问题"在于 multiprocessing.Pool.imap 将其输入传递给新进程的方式:它使用 pickle.这不适用于 mmaped numpy 数组,并且内部的某些东西会导致错误.

Apparently the "issue" is with the way multiprocessing.Pool.imap passes its input to the new processes: it uses pickle. This doesn't work with mmaped numpy arrays, and something inside breaks which leads to the error.

我发现 这个回复 Robert Kern 似乎解决了同样的问题.他建议为 imap 输入来自内存映射数组时创建一个特殊的代码路径:在生成的进程中手动内存映射相同的数组.

I found this reply by Robert Kern which seems to address the same issue. He suggests creating a special code path for when the imap input comes from a memory-mapped array: memory-mapping the same array manually in the spawned process.

这将是如此复杂和丑陋,以至于我宁愿忍受错误和额外的内存副本.有没有其他方法可以更轻松地修改现有代码?

This would be so complicated and ugly that I'd rather live with the error and the extra memory copies. Is there any other way that would be lighter on modifying existing code?


解决方案

我通常的方法(如果你可以忍受额外的内存副本)是在一个进程中完成所有 IO,然后将它们发送到工作线程池中.要将内存映射数组的切片加载到内存中,只需执行 x = np.array(data[yourslice]) (data[yourslice].copy() 实际上并不这样做会导致一些混乱.).

My usual approach (if you can live with extra memory copies) is to do all IO in one process and then send things out to a pool of worker threads. To load a slice of a memmapped array into memory just do x = np.array(data[yourslice]) (data[yourslice].copy() doesn't actually do this, which can lead to some confusion.).

首先,让我们生成一些测试数据:

First off, let's generate some test data:

import numpy as np
np.random.random(10000).tofile('data.dat')

您可以通过以下方式重现您的错误:

You can reproduce your errors with something like this:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield data[start:stop]

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

如果你只是切换到产生 np.array(data[start:stop]) 代替,你会解决问题:

And if you just switch to yielding np.array(data[start:stop]) instead, you'll fix the problem:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield np.array(data[start:stop])

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

当然,这确实会为每个块创建一个额外的内存副本.

Of course, this does make an extra in-memory copy of each chunk.

从长远来看,您可能会发现从内存映射文件切换到 HDF 之类的文件会更容易.如果您的数据是多维的,则尤其如此.(我推荐 h5py,但如果您的数据是类似表格的",pyTables 会很好.)

In the long run, you'll probably find that it's easier to switch away from memmapped files and move to something like HDF. This especially true if your data is multidimensional. (I'd reccomend h5py, but pyTables is nice if your data is "table-like".)

祝你好运,无论如何!

相关文章