如何使用多进程池读取文件

问题描述

我要读取内容约为2 GB的文件,我尝试使用多进程池来执行,但收到错误:

TypeError: 'type' object is not iterable
我知道map总是接受可迭代参数,但有什么方法可以做到这一点吗? 到目前为止,我的代码如下:

def load_embeddings(FileName):

    #file = open(FileName,'r')
    embeddings = {}
    i = 0
    print  "Loading word embeddings first time"
    for line in FileName:
             # print line

            tokens = line.split('	')
            tokens[-1] = tokens[-1].strip()

            #each line has 400 tokens
            for i in xrange(1, len(tokens)):
                    tokens[i] = float(tokens[i])
                    embeddings[tokens[0]] = tokens[1:-1]
    print  "finished"
    return embeddings

if __name__ == "__main__":

    t1 = time.time()
    p = Pool(processes=5)
    FileName  = './asag/Resources/EN-wform.w.5.cbow.neg10.400.subsmpl.txt'
    file_ = open(FileName,'r')
    #fun = partial(load_embeddings,FileName) 
    result = p.map(load_embeddings, file_)
    p.close()
    p.join()
    print ("Time it took :" + str(time.time() - t1))

解决方案

如果您的源代码在单进程环境中运行,则它是正确的。尽管您的参数FileName应该命名为file,因为它实际上是一个打开的文件句柄,而不是一个文件名(字符串)。

现在发生的情况是,您为5个进程提供了相同的文件句柄来处理。使用for line in FileName在文件句柄上执行读取操作。这在5个不同的过程中并行发生。所有人都不知道其他程序(这就是它的美妙之处:对于操作系统来说,这些是运行的不同程序。但它们都从相同的文件句柄读取)。现在,这似乎不是原子的,该调用可能在仅部分读取行之后被中断。也可能是,该Python在内部缓冲,但缓冲是按进程进行的。这会导致line中有半行,或者第一行的一部分和第二行的一部分(因为在看到第一行 之前,它只进行读取),然后当您想要进一步处理该行时,您会得到错误。

要解决此问题,您需要首先在主进程中读取文件,并将行传递给map函数,如下所示:

from multiprocessing import Pool

def load_embeddings(line):
    embeddings = {}
    i = 0
    tokens = line.split('	')
    tokens[-1] = tokens[-1].strip()

    #each line has 400 tokens
    for i in xrange(1, len(tokens)):
            tokens[i] = float(tokens[i])
            embeddings[tokens[0]] = tokens[1:-1]
    print "finished"
    return embeddings

if __name__ == "__main__":
    p = Pool(processes=5)
    file_name  = 'file.tsf'
    lines = []
    with open(file_name,'r') as f:
        for line in f:
            lines.append(line.strip())

    result = p.map(load_embeddings, lines)
    p.close()
    p.join()

相关文章