如何使用 Python 多处理池处理 tarfile?
问题描述
我正在尝试使用 multiprocessing.Pool
处理 tar 文件的内容.我能够在多处理模块中成功使用 ThreadPool 实现,但希望能够使用进程而不是线程,因为它可能会更快并消除为 Matplotlib 处理多线程环境所做的一些更改.我收到一个错误,我怀疑与进程不共享地址空间有关,但我不确定如何修复它:
I'm trying to process the contents of a tarfile using multiprocessing.Pool
. I'm able to successfully use the ThreadPool implementation within the multiprocessing module, but would like to be able to use processes instead of threads as it would possibly be faster and eliminate some changes made for Matplotlib to handle the multithreaded environment. I'm getting an error that I suspect is related to processes not sharing address space, but I'm not sure how to fix it:
Traceback (most recent call last):
File "test_tarfile.py", line 32, in <module>
test_multiproc()
File "test_tarfile.py", line 24, in test_multiproc
pool.map(read_file, files)
File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 225, in map
return self.map_async(func, iterable, chunksize).get()
File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
ValueError: I/O operation on closed file
实际的程序更复杂,但这是我正在做的一个重现错误的示例:
The actual program is more complicated, but this is an example of what I'm doing that reproduces the error:
from multiprocessing.pool import ThreadPool, Pool
import StringIO
import tarfile
def write_tar():
tar = tarfile.open('test.tar', 'w')
contents = 'line1'
info = tarfile.TarInfo('file1.txt')
info.size = len(contents)
tar.addfile(info, StringIO.StringIO(contents))
tar.close()
def test_multithread():
tar = tarfile.open('test.tar')
files = [tar.extractfile(member) for member in tar.getmembers()]
pool = ThreadPool(processes=1)
pool.map(read_file, files)
tar.close()
def test_multiproc():
tar = tarfile.open('test.tar')
files = [tar.extractfile(member) for member in tar.getmembers()]
pool = Pool(processes=1)
pool.map(read_file, files)
tar.close()
def read_file(f):
print f.read()
write_tar()
test_multithread()
test_multiproc()
我怀疑当 TarInfo
对象被传递到另一个进程但父 TarFile
不是时出现问题,但我不确定如何修复它在多进程情况下.我可以在不必从 tarball 中提取文件并将它们写入磁盘的情况下执行此操作吗?
I suspect that the something's wrong when the TarInfo
object is passed into the other process but the parent TarFile
is not, but I'm not sure how to fix it in the multiprocess case. Can I do this without having to extract files from the tarball and write them to disk?
解决方案
您没有将 TarInfo
对象传递给其他进程,而是将 tar.extractfile 的结果传递给其他进程(member)
进入另一个进程,其中 member
是一个 TarInfo
对象.extractfile(...)
方法返回一个类似文件的对象,其中包括一个 read()
方法,该方法对您打开的原始 tar 文件进行操作tar = tarfile.open('test.tar')
.
You're not passing a TarInfo
object into the other process, you're passing the result of tar.extractfile(member)
into the other process where member
is a TarInfo
object. The extractfile(...)
method returns a file-like object which has, among other things, a read()
method which operates upon the original tar file you opened with tar = tarfile.open('test.tar')
.
但是,您不能在另一个进程中使用来自一个进程的打开文件,您必须重新打开该文件.我用这个替换了你的 test_multiproc()
:
However, you can't use an open file from one process in another process, you have to re-open the file. I replaced your test_multiproc()
with this:
def test_multiproc():
tar = tarfile.open('test.tar')
files = [name for name in tar.getnames()]
pool = Pool(processes=1)
result = pool.map(read_file2, files)
tar.close()
并添加了这个:
def read_file2(name):
t2 = tarfile.open('test.tar')
print t2.extractfile(name).read()
t2.close()
并且能够让您的代码正常工作.
and was able to get your code working.
相关文章