我可以在类的方法中使用 multiprocessing.Pool 吗?

问题描述

我正在尝试在我的代码中使用 multiprocessing 以获得更好的性能.

I am tring to use multiprocessing in my code for better performance.

但是,我收到如下错误:

However, I got an error as follows:

Traceback (most recent call last):
  File "D:EpubBuilderTinyEpub.py", line 49, in <module>
    e.epub2txt()
  File "D:EpubBuilderTinyEpub.py", line 43, in epub2txt
    tempread = self.get_text()
  File "D:EpubBuilderTinyEpub.py", line 29, in get_text
    txtlist = pool.map(self.char2text,charlist)
  File "C:Python34libmultiprocessingpool.py", line 260, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:Python34libmultiprocessingpool.py", line 599, in get
    raise self._value
  File "C:Python34libmultiprocessingpool.py", line 383, in _handle_tasks
    put(task)
  File "C:Python34libmultiprocessingconnection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "C:Python34libmultiprocessingeduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.BufferedReader' object

我尝试了另一种方法并得到了这个错误:

I have tried it an other way and got this error:

TypeError: cannot serialize '_io.TextIOWrapper' object

我的代码如下所示:

from multiprocessing import Pool
class Book(object):
    def __init__(self, arg):
        self.namelist = arg
    def format_char(self,char):
        char = char + "a"
        return char
    def format_book(self):
        self.tempread = ""
        charlist = [f.read() for f in self.namelist] #list of char
        with Pool() as pool:
            txtlist = pool.map(self.format_char,charlist)
        self.tempread = "".join(txtlist)
        return self.tempread

if __name__ == '__main__':
    import os
    b = Book([open(f) for f in os.listdir()])
    t = b.format_book()
    print(t)

我认为这个错误是因为没有在main函数中使用Pool引起的.

I think that the error is raised because of not using the Pool in the main function.

我的猜想对吗?以及如何修改我的代码来修复错误?

Is my conjecture right? And how can I modify my code to fix the error?


解决方案

问题是你在 Book 实例中有一个不可选择的实例变量 (namelist).因为您在实例方法上调用 pool.map,并且您在 Windows 上运行,所以整个实例需要是可挑选的,以便将其传递给子进程.Book.namelist 是一个打开的文件对象(_io.BufferedReader),不能被pickle.您可以通过多种方式解决此问题.根据示例代码,您可以将 format_char 设为顶级函数:

The issue is that you've got an unpicklable instance variable (namelist) in the Book instance. Because you're calling pool.map on an instance method, and you're running on Windows, the entire instance needs to be picklable in order for it to be passed to the child process. Book.namelist is a open file object (_io.BufferedReader), which can't be pickled. You can fix this a couple of ways. Based on the example code, it looks like you could just make format_char a top-level function:

def format_char(char):
    char = char + "a"
    return char


class Book(object):
    def __init__(self, arg):
        self.namelist = arg

    def format_book(self):
        self.tempread = ""
        charlist = [f.read() for f in self.namelist] #list of char
        with Pool() as pool:
            txtlist = pool.map(format_char,charlist)
        self.tempread = "".join(txtlist)
        return self.tempread

但是,如果实际上,您需要 format_char 作为实例方法,则可以使用 __getstate__/__setstate__ 通过删除 使 Book 可挑选namelist 在腌制之前从实例中获取参数:

However, if in reality, you need format_char to be an instance method, you can use __getstate__/__setstate__ to make Book picklable, by removing the namelist argument from the instance before pickling it:

class Book(object):
    def __init__(self, arg):
        self.namelist = arg

    def __getstate__(self):
        """ This is called before pickling. """
        state = self.__dict__.copy()
        del state['namelist']
        return state

    def __setstate__(self, state):
        """ This is called while unpickling. """
        self.__dict__.update(state)

    def format_char(self,char):
        char = char + "a"

    def format_book(self):
        self.tempread = ""
        charlist = [f.read() for f in self.namelist] #list of char
        with Pool() as pool:
            txtlist = pool.map(self.format_char,charlist)
        self.tempread = "".join(txtlist)
        return self.tempread

只要你不需要在子进程中访问namelist就可以了.

This would be ok as long as you don't need to access namelist in the child process.

相关文章