为什么ProcessPoolExecutor和Pool在调用Super()时崩溃?

问题描述

1.为什么以下使用concurrent.futures模块的Python代码永远挂起?

import concurrent.futures


class A:

    def f(self):
        print("called")


class B(A):

    def f(self):
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
        executor.submit(super().f)


if __name__ == "__main__":
    B().f()

该调用引发不可见的异常[Errno 24] Too many open files(若要查看该异常,请将行executor.submit(super().f)替换为print(executor.submit(super().f).exception()))。

但是,将ProcessPoolExecutor替换为ThreadPoolExecutor会如预期的那样打印";调用";。

2.以下使用multiprocessing.pool模块的Python代码为什么引发异常AssertionError: daemonic processes are not allowed to have children

import multiprocessing.pool


class A:

    def f(self):
        print("called")


class B(A):

    def f(self):
        pool = multiprocessing.pool.Pool(2)
        pool.apply(super().f)


if __name__ == "__main__":
    B().f()

但是,将Pool替换为ThreadPool会如预期的那样打印";调用";。

环境:CPython3.7、MacOS 10.14。


解决方案

concurrent.futures.ProcessPoolExecutormultiprocessing.pool.Pool使用multiprocessing.queues.Queue将功函数对象从调用方传递到工作进程,Queue使用pickle模块进行序列化/反序列化,但无法正确处理绑定了子类实例的方法对象:

f = super().f
print(f)
pf = pickle.loads(pickle.dumps(f))
print(pf)

输出:

<bound method A.f of <__main__.B object at 0x104b24da0>>
<bound method B.f of <__main__.B object at 0x104cfab38>>

A.f变为B.f,这将有效地在工作进程中创建对B.ftoB.f的无限递归调用。

pickle.dumps绑定方法对象的利用__reduce__方法,IMO,its implementation没有考虑这个场景,它没有照顾到真正的func对象,而只是试图从实例selfobj(B())中取回简单名称(f),这导致了B.f,很可能是一个错误。

好消息是,因为我们知道问题出在哪里,我们可以通过实现我们自己的归约函数来修复它,该函数尝试从原始函数(A.f)和实例obj(B())重新创建绑定的方法对象:

import types
import copyreg
import multiprocessing

def my_reduce(obj):
    return (obj.__func__.__get__, (obj.__self__,))

copyreg.pickle(types.MethodType, my_reduce)
multiprocessing.reduction.register(types.MethodType, my_reduce)

我们可以这样做,因为绑定方法是一个描述符。

PS:我已提交a bug report。

相关文章