如何解决使用Pool.map()进行多处理时的内存问题?

问题描述

我已经将程序(如下)写到:

  • 将大型文本文件读取为pandas dataframe
  • 然后groupby使用特定列值拆分数据并存储为数据帧列表。
  • 然后通过管道将数据传送到multiprocess Pool.map()以并行处理每个数据帧。

一切都很好,程序在我的小测试数据集上运行良好。但是,当我输入我的大数据(大约14 GB)时,内存消耗呈指数级增加,然后冻结计算机或被杀死(在HPC集群中)。

我已经添加了代码,以便在数据/变量无用时立即清除内存。一做好,我也会马上把游泳池关起来。对于14 GB的输入,我原本预计只有2*14 GB的内存负担,但是看起来有很多问题。我还尝试使用chunkSize and maxTaskPerChild, etc进行调整,但我没有看到两个测试与大文件在优化方面有什么不同。

我认为当我启动multiprocessing时,需要在此代码位置改进此代码。

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) 但是,我要发布整个代码。

测试示例:我创建了一个大小为250mb的测试文件("genome_Matrix_inal-chr1234-1mb.txt"),并运行了该程序。当我检查系统监视器时,我可以看到内存消耗增加了大约6 GB。我不太清楚为什么250MB的文件加上一些输出占用了这么多的内存空间。我已经通过Dropbox分享了那个文件,如果它有助于看到真正的问题。https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

有没有人能建议一下,我怎样才能摆脱这个问题?

我的python脚本:

#!/home/bin/python3

import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource

print()
print('Checking required modules')
print()


''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt"   # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt"  # test file 02
#genome_matrix_file = "genome_matrix_final.txt"    # large file 

def main():
    with open("genome_matrix_header.txt") as header:
        header = header.read().rstrip('
').split('	')
        print()

    time01 = time.time()
    print('starting time: ', time01)

    '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''
    gen_matrix_df = pd.read_csv(genome_matrix_file, sep='	', names=header)

    # now, group the dataframe by chromosome/contig - so it can be multiprocessed
    gen_matrix_df = gen_matrix_df.groupby('CHROM')

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    gen_matrix_df_list = collections.OrderedDict()
    for chr_, data in gen_matrix_df:
        gen_matrix_df_list[chr_] = data

    # clear memory
    del gen_matrix_df

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    del gen_matrix_df_list  # clear memory

    p.close()
    p.join()


    # concat the results from pool.map() and write it to a file
    result_merged = pd.concat(result)
    del result  # clear memory

    pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='	', header=True, index=False)

    print()
    print('completed all process in "%s" sec. ' % (time.time() - time01))
    print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
    print()


'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):

    print()
    time02 = time.time()

    # index position of the samples in genome matrix file
    sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
                    {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
                    {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
                    {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
                    {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
                    {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
                    {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
                    {'8a': 32, '8b': 17}]

    # sample index stored as ordered dictionary
    sample_idx_ord_list = []
    for ids in sample_idx:
        ids = collections.OrderedDict(sorted(ids.items()))
        sample_idx_ord_list.append(ids)


    # for haplotype file
    header = ['contig', 'pos', 'ref', 'alt']

    # adding some suffixes "PI" to available sample names
    for item in sample_idx_ord_list:
        ks_update = ''
        for ks in item.keys():
            ks_update += ks
        header.append(ks_update+'_PI')
        header.append(ks_update+'_PG_al')


    #final variable store the haplotype data
    # write the header lines first
    haplotype_output = '	'.join(header) + '
'


    # to store the value of parsed the line and update the "PI", "PG" value for each sample
    updated_line = ''

    # read the piped in data back to text like file
    matrix_df = pd.DataFrame.to_csv(matrix_df, sep='	', index=False)

    matrix_df = matrix_df.rstrip('
').split('
')
    for line in matrix_df:
        if line.startswith('CHROM'):
            continue

        line_split = line.split('	')
        chr_ = line_split[0]
        ref = line_split[2]
        alt = list(set(line_split[3:]))

        # remove the alleles "N" missing and "ref" from the alt-alleles
        alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))

        # if no alt alleles are found, just continue
        # - i.e : don't write that line in output file
        if len(alt_up) == 0:
            continue

        #print('
Mining data for chromosome/contig "%s" ' %(chr_ ))
        #so, we have data for CHR, POS, REF, ALT so far
        # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
        sample_data_for_vcf = []
        for ids in sample_idx_ord_list:
            sample_data = []
            for key, val in ids.items():
                sample_value = line_split[val]
                sample_data.append(sample_value)

            # now, update the phased state for each sample
            # also replacing the missing allele i.e "N" and "-" with ref-allele
            sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
            sample_data_for_vcf.append(str(chr_))
            sample_data_for_vcf.append(sample_data)

        # add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
        # and .. write it to final haplotype file
        sample_data_for_vcf = '	'.join(sample_data_for_vcf)
        updated_line = '	'.join(line_split[0:3]) + '	' + ','.join(alt_up) + 
            '	' + sample_data_for_vcf + '
'
        haplotype_output += updated_line

    del matrix_df  # clear memory
    print('completed haplotype preparation for chromosome/contig "%s" '
          'in "%s" sec. ' %(chr_, time.time()-time02))
    print('	Worker maximum memory usage: %.2f (mb)' %(current_mem_usage()))

    # return the data back to the pool
    return pd.read_csv(io.StringIO(haplotype_output), sep='	')


''' to monitor memory '''
def current_mem_usage():
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.


if __name__ == '__main__':
    main()

赏金猎人更新:

我使用Pool.map()实现了多处理,但是代码造成了很大的内存负担(输入测试文件~300mb,但内存负担约为6 GB)。我原本预计最多只有3*300MB的内存负担。

  • 有没有人能解释一下,这么小的文件,这么小的计算,需要这么大的内存是怎么回事?
  • 另外,我正在尝试接受答案,并使用它来改进我的大型程序中的多进程。因此,添加任何不过多改变计算部分(CPU绑定进程)结构的方法、模块都可以。
  • 我包含了两个测试文件,用于测试代码。
  • 所附代码是完整代码,因此在复制粘贴时应该可以正常工作。任何更改都应仅用于改进多处理步骤中的优化。

解决方案

先决条件

  1. 在Python中(在下面我使用的是Python3.6.5的64位版本),所有东西都是对象。这有它的开销,使用getsizeof我们可以精确地看到对象的大小(以字节为单位):

    >>> import sys
    >>> sys.getsizeof(42)
    28
    >>> sys.getsizeof('T')
    50
    
  2. 使用fork系统调用(默认在*nix上,请参见multiprocessing.get_start_method())创建子进程时,不复制父进程的物理内存,并使用copy-on-write技术。
  3. 派生子进程仍将报告父进程的全部RSS(驻留集大小)。因此,PSS(比例集大小)更适合用来估计分叉应用程序的内存使用情况。以下是该页面中的一个示例:
  • 进程A有50 KiB的非共享内存
  • 进程B有300 KiB的非共享内存
  • 进程A和进程B都有100 KiB的相同共享内存区

由于PSS定义为一个进程的非共享内存和与其他进程共享内存的比例之和,所以这两个进程的PSS如下:

  • 进程A的PSS=50 KiB+(100 KiB/2)=100 KiB
  • 进程B的PSS=300 KiB+(100 KiB/2)=350 KiB

数据框

不让我们单独看一下您的DataFramememory_profiler将对我们有所帮助。

jupd.py

#!/usr/bin/env python3

import pandas as pd
from memory_profiler import profile

@profile
def main():
    with open('genome_matrix_header.txt') as header:
        header = header.read().rstrip('
').split('	')

    gen_matrix_df = pd.read_csv(
        'genome_matrix_final-chr1234-1mb.txt', sep='	', names=header)

    gen_matrix_df.info()
    gen_matrix_df.info(memory_usage='deep')

if __name__ == '__main__':
    main()

现在让我们使用探查器:

mprof run justpd.py
mprof plot

我们可以看到曲线图:

和逐行跟踪:

Line #    Mem usage    Increment   Line Contents
================================================
     6     54.3 MiB     54.3 MiB   @profile
     7                             def main():
     8     54.3 MiB      0.0 MiB       with open('genome_matrix_header.txt') as header:
     9     54.3 MiB      0.0 MiB           header = header.read().rstrip('
').split('	')
    10                             
    11   2072.0 MiB   2017.7 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='	', names=header)
    12                                 
    13   2072.0 MiB      0.0 MiB       gen_matrix_df.info()
    14   2072.0 MiB      0.0 MiB       gen_matrix_df.info(memory_usage='deep')

我们可以看到,数据帧在构建过程中占用了~2GiB,峰值在~3GiB。更有趣的是info的输出。

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB

info(memory_usage='deep')("深度"是指通过查询objectdtype来深入自省数据,见下文)给出:

memory usage: 7.9 GB

嗯?!从流程外部看,我们可以确保memory_profiler的数字是正确的。sys.getsizeof还显示了框架的相同值(很可能是因为自定义的__sizeof__),使用该值估计分配的gc.get_objects()的其他工具也是如此,例如pympler

# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()   

给予:

                                             types |   # objects |   total size
================================================== | =========== | ============
                 <class 'pandas.core.series.Series |          34 |      7.93 GB
                                      <class 'list |        7839 |    732.38 KB
                                       <class 'str |        7741 |    550.10 KB
                                       <class 'int |        1810 |     49.66 KB
                                      <class 'dict |          38 |      7.43 KB
  <class 'pandas.core.internals.SingleBlockManager |          34 |      3.98 KB
                             <class 'numpy.ndarray |          34 |      3.19 KB

那么这7.93GiB是从哪里来的呢?让我们试着解释一下这一点。我们有4M行和34列,即134M值。它们可以是int64object(这是一个64位指针,详细说明请参阅using pandas with large data)。因此,我们仅对数据帧中的值有134 * 10 ** 6 * 8 / 2 ** 20~1022 MiB。剩余的~6.93 GiB怎么办?

字符串插入

要理解该行为,有必要了解Python执行字符串内嵌。有两篇关于Python2中字符串嵌入的好文章(one,two)。除了Python3中的Unicode更改和Python3.3中的PEP 393之外,C结构也发生了变化,但思路是相同的。基本上,Python将在内部字典中缓存每个看起来像标识符的短字符串,并且引用将指向相同的Python对象。换句话说,我们可以说它的行为就像一个单例。我在上面提到的文章解释了它提供了哪些显著的内存配置文件和性能改进。我们可以使用PyASCIIObjectinterned字段检查字符串是否被占用:

import ctypes

class PyASCIIObject(ctypes.Structure):
     _fields_ = [
         ('ob_refcnt', ctypes.c_size_t),
         ('ob_type', ctypes.py_object),
         ('length', ctypes.c_ssize_t),
         ('hash', ctypes.c_int64),
         ('state', ctypes.c_int32),
         ('wstr', ctypes.c_wchar_p)
    ]

然后:

>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0

使用两个字符串,我们还可以进行身份比较(如果是CPython,则在内存比较中寻址)。

>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True
正因为如此,对于objectdtype,数据帧最多分配20个字符串(每个氨基酸一个)。不过,值得注意的是,Pandas推荐categorical types用于枚举。

pandas 内存

这样我们就可以解释7.93GiB的天真估计,如下所示:

>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58  
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953
请注意,str_size是58个字节,而不是我们上面看到的1个字符文字的50个字节。这是因为PEP393定义了紧凑和非紧凑字符串。您可以通过sys.getsizeof(gen_matrix_df.REF[0])查看。

实际内存消耗应该是gen_matrix_df.info()报告的~1 GiB,是它的两倍。我们可以假设这与Pandas或NumPy进行的内存(预)分配有关。下面的实验表明这不是没有原因的(多次运行显示保存画面):

Line #    Mem usage    Increment   Line Contents
================================================
     8     53.1 MiB     53.1 MiB   @profile
     9                             def main():
    10     53.1 MiB      0.0 MiB       with open("genome_matrix_header.txt") as header:
    11     53.1 MiB      0.0 MiB           header = header.read().rstrip('
').split('	')
    12                             
    13   2070.9 MiB   2017.8 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='	', names=header)
    14   2071.2 MiB      0.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    15   2071.2 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    16   2040.7 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    23   1827.1 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    24   1094.7 MiB   -732.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    25   1765.9 MiB    671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    26   1094.7 MiB   -671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    27   1704.8 MiB    610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    28   1094.7 MiB   -610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    29   1643.9 MiB    549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    30   1094.7 MiB   -549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    31   1582.8 MiB    488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    32   1094.7 MiB   -488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    33   1521.9 MiB    427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    34   1094.7 MiB   -427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    35   1460.8 MiB    366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    36   1094.7 MiB   -366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    37   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    47   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

我想引用" pandas "原作者fresh article about design issues and future Pandas2中的一句话来结束本节。

pandas 的经验法则:拥有数据集大小的5到10倍的RAM

进程树

最后,让我们来到池中,看看是否可以使用写入时复制(copy-on-write)。我们将使用smemstat(可从Ubuntu存储库获得)来估计进程组内存共享,并使用glances写下系统范围内的空闲内存。两者都可以编写JSON。

我们将使用Pool(2)运行原始脚本。我们需要3个终端窗口。

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

mprof plot生成:

总和图表(mprof run --nopython --include-children ./script.py)如下所示:

请注意,上面的两个图表显示了RSS。假设是因为写时复制,所以它不能反映实际的内存使用情况。现在我们有来自smemstatglances的两个JSON文件。我将使用以下脚本将JSON文件转换为CSV。

#!/usr/bin/env python3

import csv
import sys
import json

def smemstat():
  with open('smemstat.json') as f:
    smem = json.load(f)

  rows = []
  fieldnames = set()    
  for s in smem['smemstat']['periodic-samples']:
    row = {}
    for ps in s['smem-per-process']:
      if 'script.py' in ps['command']:
        for k in ('uss', 'pss', 'rss'):
          row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20

    # smemstat produces empty samples, backfill from previous
    if rows:            
      for k, v in rows[-1].items():
        row.setdefault(k, v)

    rows.append(row)
    fieldnames.update(row.keys())

  with open('smemstat.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
    dw.writeheader()
    list(map(dw.writerow, rows))

def glances():
  rows = []
  fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
    'free', 'mem_critical', 'inactive', 'shared', 'history_size',
    'mem_warning', 'total', 'active', 'buffers']
  with open('glances.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=fieldnames)
    dw.writeheader()
    with open('glances.json') as f:
      for l in f:
        d = json.loads(l)
        dw.writerow(d['mem'])

if __name__ == '__main__':
  globals()[sys.argv[1]]()

首先让我们看一下free内存。

第一和最小值之间的差值为~4.15GiB。以下是PSS数据的外观:

和总和:

因此我们可以看到,由于写入时复制,实际内存消耗约为4.15GiB。但我们仍在序列化数据,以便通过Pool.map将其发送到工作进程。我们是否也可以在此处利用写入时复制功能?

共享数据

要使用写入时复制,我们需要使list(gen_matrix_df_list.values())可以全局访问,以便分支之后的Worker仍然可以读取它。

  1. 我们在main中的del gen_matrix_df之后修改代码,如下所示:

    ...
    global global_gen_matrix_df_values
    global_gen_matrix_df_values = list(gen_matrix_df_list.values())
    del gen_matrix_df_list
    
    p = Pool(2)
    result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
    ...
    
  2. 删除后面的del gen_matrix_df_list
  3. 并修改matrix_to_vcf的第一行LIKE:

    def matrix_to_vcf(i):
        matrix_df = global_gen_matrix_df_values[i]
    

现在让我们重新运行它。可用内存:

流程树:

及其总和:

因此,我们的实际内存使用量最大为约2.9 GiB(主进程在构建数据帧时达到的峰值),写入时复制提供了帮助!

作为附注,存在所谓的读取时复制(copy-on-read),即Python的引用循环垃圾收集器described in Instagram Engineering的行为(这导致了issue31558中的gc.freeze)。但gc.disable()在此特定情况下没有影响。

更新

写时拷贝无拷贝数据共享的另一种选择是使用numpy.memmap从头开始将其委托给内核。下面是PythonTalk中高性能数据处理中的an example implementation。然后tricky part使 pandas 使用mmaped Numpy数组。

相关文章