在Linux上实现大型数据帧的Python多处理

2022-04-10 00:00:00 python pandas python-multiprocessing

问题描述

如标题所示,我有一个大数据帧(df)需要按行处理,因为df很大(6 GB),我想利用multiprocessing包来加速,下面是一个玩具示例,考虑到我的写作技巧和任务的复杂性,我将简要描述我想要实现的目标,并列出代码的细节。

原始数据是df,我想从其中执行一些逐行分析(顺序并不重要),它不仅需要焦点行本身,还需要满足特定条件的其他行。以下是玩具数据和我的代码,

import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math

# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),
                   'district': (['upper'] * 5 + ['down'] * 5) * 3,
                   'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})

df['row_id'] = df.index

print(df)

    value district region  row_id
0       8    upper      A       0
1       4    upper      A       1
2       0    upper      A       2
3       3    upper      A       3
4       0    upper      A       4
5       0     down      A       5
6       3     down      A       6
7       7     down      A       7
8       1     down      A       8
9       7     down      A       9
10      7    upper      B      10
11      3    upper      B      11
12      9    upper      B      12
13      8    upper      B      13
14      2    upper      B      14
15      4     down      B      15
16      5     down      B      16
17      3     down      B      17
18      5     down      B      18
19      3     down      B      19
20      3    upper      C      20
21      1    upper      C      21
22      3    upper      C      22
23      0    upper      C      23
24      3    upper      C      24
25      2     down      C      25
26      0     down      C      26
27      1     down      C      27
28      1     down      C      28
29      0     down      C      29
我想要做的是添加另外两个列<[2-4]和count_a,它们只计算落在相同的<[2-6]和district子集内的范围(Value-2,Value)和(Value,Value+2)的行数,例如, 第row_id==0行的count_b应为0,因为region=='A'district == 'upper'中没有值7,这两行都属于(8-2,8)。因此,所需的输出应为:

   count_a count_b region row_id
0        0       0      A      0
1        0       1      A      1
2        0       0      A      2
3        1       0      A      3
4        0       0      A      4
5        1       0      A      5
6        0       0      A      6
7        0       0      A      7
8        0       1      A      8
9        0       0      A      9
10       1       0      B     10
11       0       1      B     11
12       0       1      B     12
13       1       1      B     13
14       1       0      B     14
15       2       2      B     15
16       0       1      B     16
17       1       0      B     17
18       0       1      B     18
19       1       0      B     19
20       0       0      C     20
21       0       1      C     21
22       0       0      C     22
23       1       0      C     23
24       0       0      C     24
25       0       2      C     25
26       2       0      C     26
27       1       2      C     27
28       1       2      C     28
29       2       0      C     29

问题1:此类任务是否可以矢量化?

问题2:如何使用multiprocessing加快速度(已解决)?

我决定使用multiprocessing,因为我不确定如何通过矢量化来实现这一点。解决方案是(根据提供的答案)

多进程

def b_a(input_df,r_d):
    print('length of input dataframe: ' + str(len(input_df)))
    # print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))
    sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]

    print('length of sliced dataframe: ' + str(len(sub_df)))

    print(r_d[0],r_d[1])


    b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])

    for id in sub_df['row_id']:
        print('processing row: ' + str(id))
        focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
        temp_b = sub_df.loc[
            (sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
        temp_a = sub_df.loc[
            (sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]

        if len(temp_a):
            temp_a['count_a'] = temp_a['row_id'].count()
        else:
            temp_a = temp_a.append(pd.Series(), ignore_index=True)
            temp_a = temp_a.reindex(
                columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)
            print(temp_a)

        if len(temp_b):
            temp_b['count_b'] = temp_b['row_id'].count()
        else:
            temp_b = temp_b.append(pd.Series(), ignore_index=True)
            temp_b = temp_b.reindex(
                columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)
        print(len(temp_a),len(temp_b))

        temp_b.drop_duplicates('count_b', inplace=True)
        temp_a.drop_duplicates('count_a', inplace=True)
        temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
                          temp_a[['count_a']].reset_index(drop=True)], axis=1)

        temp['row_id'] = id
        temp['region'] = str(r_d[0])

        b_a = pd.concat([b_a, temp])

    return b_a

r_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))


if __name__ == '__main__':
    P = Pool(3)
    out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],
                             list(itertools.chain.from_iterable(r_d_list)))) # S3

    # out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2
    # out = P.starmap(b_a,zip(df,r_d_list)) # S1

    # print(out)
    P.close()
    P.join()
    final = pd.concat(out, ignore_index=True)
    print(final)

    final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))

由于使用P.starmap(以及P.map)需要向函数提供b_a的所有可能的对参数,因此解决方案S1不起作用,因为zip(df,r_d_list)实际上会在df的列名和r_d_list中的元素之间产生压缩,这将导致错误AttributeError: 'str' object has no attribute 'loc',因为函数b_ainput_df字面上是一个字符串(列名df),可以通过查看print('length of input dataframe: ' + str(len(input_df)))的输出进行验证,该输出将生成input_df(在本例中为df)的列名长度。可接受的答案通过创建与参数列表(r_d_list)具有相同长度的引用数组(S2)(不确定具体是什么)来纠正这一点。这个解决方案很有效,但当df较大时可能会很慢,因为根据我个人的理解,它需要搜索每个参数对(regiondistrcit)的整个数据帧,所以我提出了一个修改的版本,根据regiondistrcit将数据分成块,然后在每个块内搜索,而不是在整个数据帧内搜索(S3)。对我来说,这个解决方案在运行时间方面将性能提高了20%,代码见下文:

region = df['region'].unique()

chunk_numbers = 3

chunk_region = math.ceil(len(region) / chunk_numbers)

chunks = list()

r_d_list = list()

row_count = 0

for i in range(chunk_numbers):

    print(i)

    if i < chunk_numbers-1:
        regions = region[(i*chunk_region):((i+1)*chunk_region)]
        temp = df.loc[df['region'].isin(regions.tolist())]
        chunks.append(temp)
        r_d_list.append(list(itertools.product(regions,temp['district'].unique())))

        del temp

    else:
        regions = region[(i * chunk_region):len(region)]
        temp = df.loc[df['region'].isin(regions.tolist())]
        chunks.append(temp)
        r_d_list.append(list(itertools.product(regions,temp['district'].unique())))

        del temp

    row_count = row_count + len(chunks[i])
    print(row_count)
添加到print(df)def b_a()之间,并记住注释掉if __name__ == '__main__'之前的r_d_list = ...

感谢这个精彩的社区,我现在有了一个可行的解决方案,我更新了我的问题,为将来可能遇到同样问题的人提供一些材料,并更好地制定问题以获得更好的解决方案。


解决方案

我认为这里还有改进的空间。我建议您在groupby

中定义一个函数
import os
import pandas as pd
import numpy as np
import dask.dataframe as dd
N = 30_000
# Now the example is reproducible
np.random.seed(0)
df = pd.DataFrame({'value': np.random.randint(0, 10, size=N),
                   'district': (['upper'] * 5 + ['down'] * 5) * 3000,
                   'region': ['A'] * 10_000 + ['B'] * 10_000 + ['C'] * 10_000,
                   'row_id': np.arange(N)})

以下函数为给定组中的每一行返回count_acount_b

def fun(vec):
    out = []
    for i, v in enumerate(vec):
        a = vec[:i] + vec[i+1:]
        count_a = np.isin(a, [v-2, 2]).sum()
        count_b = np.isin(a, [v, v+2]).sum()
        out.append([count_a, count_b])
    return out

pandas

%%time
df[["count_a", "count_b"]] = df.groupby(["district", "region"])["value"]
                               .apply(lambda x: fun(x))
                               .explode().apply(pd.Series)
                               .reset_index(drop=True)
CPU times: user 22.6 s, sys: 174 ms, total: 22.8 s
Wall time: 22.8 s

任务

现在您需要重新创建df,然后可以使用dask。这是我想到的第一件事。当然有更好/更快的方法。

ddf = dd.from_pandas(df, npartitions=os.cpu_count())

df[["count_a", "count_b"]] = ddf.groupby(["district", "region"])["value"]
                                .apply(lambda x: fun(x.tolist()),
                                       meta=('x', 'f8'))
                                .compute(scheduler='processes')
                                .explode().apply(pd.Series)
                                .reset_index(drop=True)
CPU times: user 6.92 s, sys: 114 ms, total: 7.04 s
Wall time: 13.4 s

多处理

在这种情况下,您还需要创建df。这里的诀窍是将df拆分为dfdf列表。

import multiprocessing as mp
def parallelize(fun, vec, cores):
    with mp.Pool(cores) as p:
        res = p.map(fun, vec)
    return res

def par_fun(d):
    d = d.reset_index(drop=True)
    o = pd.DataFrame(fun(d["value"].tolist()),
                     columns=["count_a", "count_b"])
    return pd.concat([d,o], axis=1)
%%time
lst = [l[1] for l in list(df.groupby(["district", "region"]))]

out = parallelize(par_fun, lst, os.cpu_count())
out = pd.concat(out, ignore_index=True)
CPU times: user 152 ms, sys: 49.7 ms, total: 202 ms
Wall time: 5 s
最终,您可以使用numba改进您的功能。

相关文章