使用多处理读取多个文件

2022-01-12 00:00:00 python performance multiprocessing

问题描述

我需要阅读一些非常大的文本文件(100+ Mb),用正则表达式处理每一行并将数据存储到一个结构中.我的结构继承自 defaultdict,它有一个读取 self.file_name 文件的 read(self) 方法.

I need to read some very huge text files (100+ Mb), process every lines with regex and store the data into a structure. My structure inherits from defaultdict, it has a read(self) method that read self.file_name file.

看这个非常简单(但不是真实的)示例,我没有使用正则表达式,但我正在拆分行:

Look at this very simple (but not real) example, I'm not using regex, but I'm splitting lines:


import multiprocessing
from collections import defaultdict

def SingleContainer():
    return list()

class Container(defaultdict):
    """
    this class store odd line in self["odd"] and even line in self["even"].
    It is stupid, but it's only an example. In the real case the class
    has additional methods that do computation on readen data.
    """
    def __init__(self,file_name):
        if type(file_name) != str:
            raise AttributeError, "%s is not a string" % file_name
        defaultdict.__init__(self,SingleContainer)
        self.file_name = file_name
        self.readen_lines = 0
    def read(self):
        f = open(self.file_name)
        print "start reading file %s" % self.file_name
        for line in f:
            self.readen_lines += 1
            values = line.split()
            key = {0: "even", 1: "odd"}[self.readen_lines %2]
            self[key].append(values)
        print "readen %d lines from file %s" % (self.readen_lines, self.file_name)

def do(file_name):
    container = Container(file_name)
    container.read()
    return container.items()

if __name__ == "__main__":
    file_names = ["r1_200909.log", "r1_200910.log"]
    pool = multiprocessing.Pool(len(file_names))
    result = pool.map(do,file_names)
    pool.close()
    pool.join()
    print "Finish"      

最后,我需要将每个结果加入一个容器中.保持行的顺序很重要.返回值时我的方法太慢了.更好的解决方案?我在 Linux 上使用 python 2.6

At the end I need to join every results in a single Container. It is important that the order of the lines is preserved. My approach is too slow when returning values. Better solution? I'm using python 2.6 on Linux


解决方案

你可能遇到了两个问题.

You're probably hitting two problems.

提到了其中一个:您正在同时读取多个文件.这些读取最终会被交错,导致磁盘抖动.您想一次读取整个文件,然后只对数据进行多线程计算.

One of them was mentioned: you're reading multiple files at once. Those reads will end up being interleaved, causing disk thrashing. You want to read whole files at once, and then only multithread the computation on the data.

其次,您遇到了 Python 的多处理模块的开销.它实际上不是使用线程,而是启动多个进程并通过管道序列化结果.这对于批量数据来说非常慢——事实上,它似乎比您在线程中所做的工作要慢(至少在示例中).这是由 GIL 引起的现实问题.

Second, you're hitting the overhead of Python's multiprocessing module. It's not actually using threads, but instead starting multiple processes and serializing the results through a pipe. That's very slow for bulk data--in fact, it seems to be slower than the work you're doing in the thread (at least in the example). This is the real-world problem caused by the GIL.

如果我修改 do() 以返回 None 而不是 container.items() 以禁用额外的数据复制,则此示例 比单个线程快,只要文件已被缓存:

If I modify do() to return None instead of container.items() to disable the extra data copy, this example is faster than a single thread, as long as the files are already cached:

两个线程:0.36elapsed 168%CPU

Two threads: 0.36elapsed 168%CPU

一个线程(用map替换pool.map):0:00.52elapsed 98%CPU

One thread (replace pool.map with map): 0:00.52elapsed 98%CPU

不幸的是,GIL 问题是根本性的,无法从 Python 内部解决.

Unfortunately, the GIL problem is fundamental and can't be worked around from inside Python.

相关文章