Python多处理池在加入时挂起?

2022-01-12 00:00:00 python multiprocessing

问题描述

我正在尝试在多个文件上并行运行一些 python 代码.构造基本上是:

I'm trying to run some python code on several files in parallel. The construct is basically:

def process_file(filename, foo, bar, baz=biz):
    # do stuff that may fail and cause exception

if __name__ == '__main__':
    # setup code setting parameters foo, bar, and biz

    psize = multiprocessing.cpu_count()*2
    pool = multiprocessing.Pool(processes=psize)

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:])
    pool.close()
    pool.join()

我以前使用 pool.map 来做类似的事情并且效果很好,但我似乎不能在这里使用它,因为 pool.map 不允许(似乎)允许我传递额外的参数(并且使用 lambda 来做这件事是行不通的,因为 lambda 不能被编组).

I've previously used pool.map to do something similar and it worked great, but I can't seem to use that here because pool.map doesn't (appear to) allow me to pass in extra arguments (and using lambda to do it won't work because lambda can't be marshalled).

所以现在我正在尝试直接使用 apply_async() 来让事情正常工作.我的问题是代码似乎挂起并且永远不会退出.一些文件因异常而失败,但我不明白为什么会导致连接失败/挂起?有趣的是,如果没有一个文件因异常而失败,它确实会干净地退出.

So now I'm trying to get things to work using apply_async() directly. My issue is that the code seems to hang and never exit. A few of the files fail with an exception, but i don't see why what would cause join to fail/hang? Interestingly if none of the files fail with an exception, it does exit cleanly.

我错过了什么?

当函数(以及工作程序)失败时,我看到了这个异常:

When the function (and thus a worker) fails, I see this exception:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>, ())

如果我看到其中之一,进程父进程将永远挂起,永远不会收获子进程并退出.

If i see even one of these, the process parent process hangs forever, never reaping the children and exiting.


解决方案

很抱歉回答我自己的问题,但我至少找到了一种解决方法,所以如果其他人有类似的问题,我想在这里发布.我会接受任何更好的答案.

Sorry to answer my own question, but I've found at least a workaround so in case anyone else has a similar issue I want to post it here. I'll accept any better answers out there.

我认为问题的根源是 http://bugs.python.org/issue9400 .这告诉我两件事:

I believe the root of the issue is http://bugs.python.org/issue9400 . This tells me two things:

  • 我没疯,我正在努力做的事情真的应该奏效
  • 至少在 python2 中,如果不是不可能的话,将异常"返回到父进程是非常困难的.简单的方法有效,但许多其他方法无效.

就我而言,我的工作函数正在启动一个存在段错误的子进程.这返回了 CalledProcessError 异常,这是不可腌制的.出于某种原因,这使得父对象中的池对象出去吃午饭,而不是从对 join() 的调用中返回.

In my case, my worker function was launching a subprocess that was segfaulting. This returned CalledProcessError exception, which is not pickleable. For some reason, this makes the pool object in the parent go out to lunch and not return from the call to join().

在我的特殊情况下,我不在乎异常是什么.最多我想记录它并继续前进.为此,我只需将我的顶级工作函数包装在 try/except 子句中.如果工作进程抛出任何异常,它会在尝试返回父进程之前被捕获,记录,然后工作进程正常退出,因为它不再尝试发送异常.见下文:

In my particular case, I don't care what the exception was. At most I want to log it and keep going. To do this, I simply wrap my top worker function in a try/except clause. If the worker throws any exception, it is caught before trying to return to the parent process, logged, and then the worker process exits normally since it's no longer trying to send the exception through. See below:

def process_file_wrapped(filenamen, foo, bar, baz=biz):
    try:
        process_file(filename, foo, bar, baz=biz)
    except:
        print('%s: %s' % (filename, traceback.format_exc()))

然后,我有我的初始映射函数调用 process_file_wrapped() 而不是原来的.现在我的代码按预期工作.

Then, I have my initial map function call process_file_wrapped() instead of the original one. Now my code works as intended.

相关文章