使用多处理将方法并行应用于对象列表

2022-01-12 00:00:00 python multiprocessing

问题描述

我创建了一个包含多种方法的类.其中一种方法非常耗时,my_process,我想并行执行该方法.我遇到了 Python 多处理 - 将类方法应用于列表对象,但我不确定如何将其应用于我的问题,以及它将对我班级的其他方法产生什么影响.

I have created a class with a number of methods. One of the methods is very time consuming, my_process, and I'd like to do that method in parallel. I came across Python Multiprocessing - apply class method to a list of objects but I'm not sure how to apply it to my problem, and what effect it will have on the other methods of my class.

class MyClass():
    def __init__(self, input):
        self.input = input
        self.result = int

    def my_process(self, multiply_by, add_to):
        self.result = self.input * multiply_by
        self._my_sub_process(add_to)
        return self.result

    def _my_sub_process(self, add_to):
        self.result += add_to

list_of_numbers = range(0, 5)
list_of_objects = [MyClass(i) for i in list_of_numbers]
list_of_results = [obj.my_process(100, 1) for obj in list_of_objects] # multi-process this for-loop

print list_of_numbers
print list_of_results

[0, 1, 2, 3, 4]
[1, 101, 201, 301, 401]


解决方案

我将在这里违背常规,并建议坚持使用可能可行的最简单的东西 ;-) 即 Pool.类似 map() 的函数非常适合这种情况,但仅限于传递单个参数.与其费力地绕开这个问题,不如简单地编写一个只需要一个参数的辅助函数:一个元组.那么一切都变得简单明了.

I'm going to go against the grain here, and suggest sticking to the simplest thing that could possibly work ;-) That is, Pool.map()-like functions are ideal for this, but are restricted to passing a single argument. Rather than make heroic efforts to worm around that, simply write a helper function that only needs a single argument: a tuple. Then it's all easy and clear.

这是一个采用这种方法的完整程序,它在 Python 2 下打印您想要的内容,并且与操作系统无关:

Here's a complete program taking that approach, which prints what you want under Python 2, and regardless of OS:

class MyClass():
    def __init__(self, input):
        self.input = input
        self.result = int

    def my_process(self, multiply_by, add_to):
        self.result = self.input * multiply_by
        self._my_sub_process(add_to)
        return self.result

    def _my_sub_process(self, add_to):
        self.result += add_to

import multiprocessing as mp
NUM_CORE = 4  # set to the number of cores you want to use

def worker(arg):
    obj, m, a = arg
    return obj.my_process(m, a)

if __name__ == "__main__":
    list_of_numbers = range(0, 5)
    list_of_objects = [MyClass(i) for i in list_of_numbers]

    pool = mp.Pool(NUM_CORE)
    list_of_results = pool.map(worker, ((obj, 100, 1) for obj in list_of_objects))
    pool.close()
    pool.join()

    print list_of_numbers
    print list_of_results

大魔法

我应该指出,采用我建议的非常简单的方法有很多优点.除此之外,它在 Python 2 和 3 上正常工作",不需要更改您的类,并且易于理解,它还可以与所有 Pool 方法配合使用.

但是,如果您要并行运行多个方法,则为每个方法编写一个微小的工作函数可能会有点烦人.所以这里有一点点魔法"可以解决这个问题.像这样更改 worker():

However, if you have multiple methods you want to run in parallel, it can get a bit annoying to write a tiny worker function for each. So here's a tiny bit of "magic" to worm around that. Change worker() like so:

def worker(arg):
    obj, methname = arg[:2]
    return getattr(obj, methname)(*arg[2:])

现在,一个工作函数可以满足任意数量的方法和任意数量的参数.在您的特定情况下,只需更改一行以匹配:

Now a single worker function suffices for any number of methods, with any number of arguments. In your specific case, just change one line to match:

list_of_results = pool.map(worker, ((obj, "my_process", 100, 1) for obj in list_of_objects))

或多或少明显的概括也可以迎合带有关键字参数的方法.但是,在现实生活中,我通常会坚持最初的建议.在某些时候,迎合一概而论弊大于利.再说一次,我喜欢显而易见的东西;-)

More-or-less obvious generalizations can also cater to methods with keyword arguments. But, in real life, I usually stick to the original suggestion. At some point catering to generalizations does more harm than good. Then again, I like obvious things ;-)

相关文章