python - 如何并行化python numpy中的总和计算?

2022-01-09 00:00:00 python numpy scipy sum parallel-processing

问题描述

我有一个要计算的总和,但在并行化代码时遇到了困难.我试图并行化的计算有点复杂(它同时使用 numpy 数组和 scipy 稀疏矩阵).它吐出一个 numpy 数组,我想对大约 1000 个计算的输出数组求和.理想情况下,我会保留所有迭代的运行总和.但是,我一直无法弄清楚如何做到这一点.

I have a sum that I'm trying to compute, and I'm having difficulty parallelizing the code. The calculation I'm trying to parallelize is kind of complex (it uses both numpy arrays and scipy sparse matrices). It spits out a numpy array, and I want to sum the output arrays from about 1000 calculations. Ideally, I would keep a running sum over all the iterations. However, I haven't been able to figure out how to do this.

到目前为止,我已经尝试将 joblib 的 Parallel 函数和 pool.map 函数与 python 的多处理包一起使用.对于这两者,我使用了一个返回 numpy 数组的内部函数.这些函数返回一个列表,我将其转换为一个 numpy 数组,然后求和.

So far, I've tried using joblib's Parallel function and the pool.map function with python's multiprocessing package. For both of these, I use an inner function that returns a numpy array. These functions return a list, which I convert to a numpy array and then sum over.

但是,在 joblib Parallel 函数完成所有迭代后,主程序永远不会继续运行(看起来原始作业处于挂起状态,使用 0% CPU).当我使用 pool.map 时,在所有迭代完成后出现内存错误.

However, after the joblib Parallel function completes all iterations, the main program never continues running (it looks like the original job is in a suspended state, using 0% CPU). When I use pool.map, I get memory errors after all the iterations are complete.

有没有办法简单地并行化数组的运行总和?

Is there a way to simply parallelize a running sum of arrays?

编辑:目标是执行以下操作,但并行除外.

Edit: The goal is to do something like the following, except in parallel.

def summers(num_iters):

    sumArr = np.zeros((1,512*512)) #initialize sum
    for index in range(num_iters):
        sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

    return sumArr


解决方案

我想出了如何通过多处理、apply_async 和回调来并行化数组的总和,所以我在这里为其他人发布了这个.我使用 并行 Python 的示例页面 作为 Sum 回调类,虽然我实际上并没有使用那个包来实现.不过,它给了我使用回调的想法.这是我最终使用的简化代码,它完成了我想要它做的事情.

I figured out how to do parallelize a sum of arrays with multiprocessing, apply_async, and callbacks, so I'm posting this here for other people. I used the example page for Parallel Python for the Sum callback class, although I did not actually use that package for implementation. It gave me the idea of using callbacks, though. Here's the simplified code for what I ended up using, and it does what I wanted it to do.

import multiprocessing
import numpy as np
import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
    def __init__(self):
        self.value = np.zeros((1,512*512)) #this is the initialization of the sum
        self.lock = thread.allocate_lock()
        self.count = 0

    def add(self,value):
        self.count += 1
        self.lock.acquire() #lock so sum is correct if two processes return at same time
        self.value += value #the actual summation
        self.lock.release()

def computation(index):
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
    return array1

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    sumArr = Sum() #create an instance of callback class and zero the sum
    for index in range(num_iters):
        singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

    pool.close()
    pool.join() #waits for all the processes to finish

    return sumArr.value

我还能够使用另一个答案中建议的并行化地图来完成这项工作.我之前曾尝试过,但我没有正确实施.两种方式都有效,我认为 这个答案 很好地解释了使用哪种方法(map 或 apply.async)的问题.对于地图版本,您不需要定义类 Sum,summers 函数变为

I was also able to get this working using a parallelized map, which was suggested in another answer. I had tried this earlier, but I wasn't implementing it correctly. Both ways work, and I think this answer explains the issue of which method to use (map or apply.async) pretty well. For the map version, you don't need to define the class Sum and the summers function becomes

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
    sumArr = np.zeros((1,512*512))              #but I do to make sure I have the memory

    outputArr = np.array(pool.map(computation, range(num_iters)))
    sumArr = outputArr.sum(0)

    pool.close() #not sure if this is still needed since map waits for all iterations

    return sumArr

相关文章