在类方法Python中调用多处理
问题描述
最初,我有一个类来存储一些已处理的值并通过其他方法重用这些值.
Initially, I have a class to store some processed values and re-use those with its other methods.
问题是当我试图将类方法分成多个进程以加快速度时,python 产生了进程,但它似乎不起作用(正如我在任务管理器中看到的只有 1 个进程正在运行)并且结果从未交付.
The problem is when i tried to divide the class method into multiple process to speed up, python spawned processes but it seems didn't work (as I saw in Task Manager that only 1 process was running) and result is never delivered.
我做了几次搜索,发现 pathos.multiprocessing 可以代替,但我想知道标准库是否可以解决这个问题?
I did couple of search and found that pathos.multiprocessing can do this instead but I wonder if standard library can solve this problems?
from multiprocessing import Pool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return t
a = A(2)
a.run(list(range(10)))
解决方案
你的代码失败,因为它不能 pickle
实例方法(self.cal
),这是什么当您通过将多个进程映射到 multiprocessing.Pool
来生成多个进程时,Python 会尝试这样做(嗯,有一种方法可以做到这一点,但它太复杂了,而且无论如何都不是非常有用)——因为有没有共享内存访问,它必须打包"数据并将其发送到生成的进程进行解包.如果您尝试腌制 a
实例,也会发生同样的情况.
Your code fails as it cannot pickle
the instance method (self.cal
), which is what Python attempts to do when you're spawning multiple processes by mapping them to multiprocessing.Pool
(well, there is a way to do it, but it's way too convoluted and not extremely useful anyway) - since there is no shared memory access it has to 'pack' the data and send it to the spawned process for unpacking. The same would happen to you if you tried to pickle the a
instance.
multiprocessing
包中唯一可用的共享内存访问是鲜为人知的multiprocessing.pool.ThreadPool
,所以如果你真的想这样做:
The only shared memory access available in the multiprocessing
package is a little known multiprocessing.pool.ThreadPool
so if you really want to do this:
from multiprocessing.pool import ThreadPool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = ThreadPool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return rs
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
但这不会给你并行化,因为它本质上映射到你的常规线程,这些线程确实可以访问共享内存.您应该传递类/静态方法(如果您需要调用它们)以及您希望它们使用的数据(在您的情况下为 self.vl
).如果您需要跨进程共享该数据,则必须使用一些共享内存抽象,例如 multiprocessing.Value
,当然还要应用互斥锁.
But this will not give you parallelization as it essentially maps to your regular threads which do have access to the shared memory. You should pass class/static methods instead (if you need them called) accompanied with the data you want them to work with (in your case self.vl
). If you need to share that data across processes you'll have to use some shared memory abstraction, like multiprocessing.Value
, applying mutex along the way of course.
更新
我说过你可以做到(并且有些模块或多或少正在这样做,例如检查 pathos.multiprocessing
)但我认为这不值得 - 当你来的时候到了你必须欺骗你的系统做你想做的事的地步,你可能要么使用了错误的系统,要么你应该重新考虑你的设计.但为了了解情况,这里有一种方法可以在多处理设置中执行您想要的操作:
I said you could do it (and there are modules that more or less are doing it, check pathos.multiprocessing
for example) but I don't think it's worth the trouble - when you come to a point where you have to trick your system into doing what you want, chances are you're either using a wrong system or you should rethink your design. But for the sake of informedness, here is one way to do what you want in a multiprocessing setting:
import sys
from multiprocessing import Pool
def parallel_call(params): # a helper for calling 'remote' instances
cls = getattr(sys.modules[__name__], params[0]) # get our class type
instance = cls.__new__(cls) # create a new instance without invoking __init__
instance.__dict__ = params[1] # apply the passed state to the new instance
method = getattr(instance, params[2]) # get the requested method
args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
return method(*args) # expand arguments, call our method and return the result
class A(object):
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(parallel_call, self.prepare_call("cal", dt))
t.close()
return rs
def prepare_call(self, name, args): # creates a 'remote call' package for each argument
for arg in args:
yield [self.__class__.__name__, self.__dict__, name, arg]
if __name__ == "__main__": # important protection for cross-platform use
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
我认为它是如何工作的非常不言自明,但简而言之,它将你的类的名称、它的当前状态(无信号,tho)、要调用的所需方法和调用它的参数传递给 parallel_call
函数,为 Pool
中的每个进程调用.Python 自动腌制和取消腌制所有这些数据,所以所有 parallel_call
需要做的就是重建原始对象,在其中找到所需的方法并使用提供的参数调用它.
I think it's pretty self explanatory how it works, but in short it passes the name of your class, its current state (sans signals, tho), a desired method to be called and arguments to invoke it with to a parallel_call
function which is called for each process in the Pool
. Python automatically pickles and unpickles all this data so all parallel_call
needs to do is reconstruct the original object, find a desired method in it and call it with the provided param(s).
这样,我们只传递数据而不尝试传递活动对象,因此 Python 不会抱怨(好吧,在这种情况下,尝试将实例方法的引用添加到类参数中,看看会发生什么)和一切工作得很好.
This way we're passing only the data without trying to pass active objects so Python doesn't complain (well, in this case, try adding a reference to a instance method to your class parameters and see what happens) and everything works just fine.
如果你想重用魔法",你可以让它看起来和你的代码一模一样(创建你自己的 Pool
处理程序,从函数中提取名称并将名称发送到实际进程等),但这应该为您的示例提供足够的功能.
If you want to go heavy on the 'magic' you can make it look exactly like your code (create your own Pool
handler, pick up names from the functions and send the names to actual processes, etc.) but this should serve a sufficient function for your example.
但是,在您提高希望之前,请记住,这仅在共享静态"实例(一旦您开始在多处理上下文中调用它就不会改变其初始状态的实例)时有效.如果 A.cal
方法要更改 vl
属性的内部状态 - 它只会影响它更改的实例(除非它在调用的主实例中更改调用之间的 Pool
).如果你也想共享状态,你可以升级parallel_call
调用后获取instance.__dict__
,连同方法调用结果一起返回,然后在调用方您必须使用返回的数据更新本地 __dict__
以更改原始状态.这还不够——您实际上必须创建一个共享字典并处理所有互斥体人员,以便所有进程同时访问它(您可以为此使用 multiprocessing.Manager
).
However, before you raise your hopes up, keep in mind that this will work only when sharing a 'static' instance (an instance that doesn't change its initial state once you start invoking it in a multiprocessing context). If the A.cal
method is to change the internal state of the vl
property - it would affect only the instance where it changes (unless it changes in the main instance that calls the Pool
between calls). If you want to share the state as well, you can upgrade parallel_call
to pick up instance.__dict__
after the call and return it together with the method call result, then on the calling side you'd have to update the local __dict__
with the returned data to change the original state. And that's not enough - you'd actually have to create a shared dict and handle all the mutex staff to have it concurrently accessed by all the processes (you can use multiprocessing.Manager
for that).
所以,正如我所说,麻烦多于其价值......
So, as I was saying, more trouble than its worth...
相关文章