为什么ProcessPoolExecutor和Pool在调用Super()时崩溃?
问题描述
1.为什么以下使用concurrent.futures
模块的Python代码永远挂起?
import concurrent.futures
class A:
def f(self):
print("called")
class B(A):
def f(self):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
executor.submit(super().f)
if __name__ == "__main__":
B().f()
该调用引发不可见的异常[Errno 24] Too many open files
(若要查看该异常,请将行executor.submit(super().f)
替换为print(executor.submit(super().f).exception())
)。
但是,将ProcessPoolExecutor
替换为ThreadPoolExecutor
会如预期的那样打印";调用";。
2.以下使用multiprocessing.pool
模块的Python代码为什么引发异常AssertionError: daemonic processes are not allowed to have children
?
import multiprocessing.pool
class A:
def f(self):
print("called")
class B(A):
def f(self):
pool = multiprocessing.pool.Pool(2)
pool.apply(super().f)
if __name__ == "__main__":
B().f()
但是,将Pool
替换为ThreadPool
会如预期的那样打印";调用";。
环境:CPython3.7、MacOS 10.14。
解决方案
concurrent.futures.ProcessPoolExecutor
和multiprocessing.pool.Pool
使用multiprocessing.queues.Queue
将功函数对象从调用方传递到工作进程,Queue
使用pickle
模块进行序列化/反序列化,但无法正确处理绑定了子类实例的方法对象:
f = super().f
print(f)
pf = pickle.loads(pickle.dumps(f))
print(pf)
输出:
<bound method A.f of <__main__.B object at 0x104b24da0>>
<bound method B.f of <__main__.B object at 0x104cfab38>>
A.f
变为B.f
,这将有效地在工作进程中创建对B.f
toB.f
的无限递归调用。
pickle.dumps
绑定方法对象的利用__reduce__
方法,IMO,its implementation没有考虑这个场景,它没有照顾到真正的func
对象,而只是试图从实例self
obj(B()
)中取回简单名称(f
),这导致了B.f
,很可能是一个错误。
好消息是,因为我们知道问题出在哪里,我们可以通过实现我们自己的归约函数来修复它,该函数尝试从原始函数(A.f
)和实例obj(B()
)重新创建绑定的方法对象:
import types
import copyreg
import multiprocessing
def my_reduce(obj):
return (obj.__func__.__get__, (obj.__self__,))
copyreg.pickle(types.MethodType, my_reduce)
multiprocessing.reduction.register(types.MethodType, my_reduce)
我们可以这样做,因为绑定方法是一个描述符。
PS:我已提交a bug report。
相关文章