如何使用多进程池读取文件
问题描述
我要读取内容约为2 GB的文件,我尝试使用多进程池来执行,但收到错误:
TypeError: 'type' object is not iterable
我知道map总是接受可迭代参数,但有什么方法可以做到这一点吗?
到目前为止,我的代码如下:
def load_embeddings(FileName):
#file = open(FileName,'r')
embeddings = {}
i = 0
print "Loading word embeddings first time"
for line in FileName:
# print line
tokens = line.split(' ')
tokens[-1] = tokens[-1].strip()
#each line has 400 tokens
for i in xrange(1, len(tokens)):
tokens[i] = float(tokens[i])
embeddings[tokens[0]] = tokens[1:-1]
print "finished"
return embeddings
if __name__ == "__main__":
t1 = time.time()
p = Pool(processes=5)
FileName = './asag/Resources/EN-wform.w.5.cbow.neg10.400.subsmpl.txt'
file_ = open(FileName,'r')
#fun = partial(load_embeddings,FileName)
result = p.map(load_embeddings, file_)
p.close()
p.join()
print ("Time it took :" + str(time.time() - t1))
解决方案
如果您的源代码在单进程环境中运行,则它是正确的。尽管您的参数FileName
应该命名为file
,因为它实际上是一个打开的文件句柄,而不是一个文件名(字符串)。
现在发生的情况是,您为5个进程提供了相同的文件句柄来处理。使用for line in FileName
在文件句柄上执行读取操作。这在5个不同的过程中并行发生。所有人都不知道其他程序(这就是它的美妙之处:对于操作系统来说,这些是运行的不同程序。但它们都从相同的文件句柄读取)。现在,这似乎不是原子的,该调用可能在仅部分读取行之后被中断。也可能是,该Python在内部缓冲,但缓冲是按进程进行的。这会导致line
中有半行,或者第一行的一部分和第二行的一部分(因为在看到第一行
之前,它只进行读取),然后当您想要进一步处理该行时,您会得到错误。
要解决此问题,您需要首先在主进程中读取文件,并将行传递给map
函数,如下所示:
from multiprocessing import Pool
def load_embeddings(line):
embeddings = {}
i = 0
tokens = line.split(' ')
tokens[-1] = tokens[-1].strip()
#each line has 400 tokens
for i in xrange(1, len(tokens)):
tokens[i] = float(tokens[i])
embeddings[tokens[0]] = tokens[1:-1]
print "finished"
return embeddings
if __name__ == "__main__":
p = Pool(processes=5)
file_name = 'file.tsf'
lines = []
with open(file_name,'r') as f:
for line in f:
lines.append(line.strip())
result = p.map(load_embeddings, lines)
p.close()
p.join()
相关文章