在Linux上实现大型数据帧的Python多处理
问题描述
如标题所示,我有一个大数据帧(df
)需要按行处理,因为df
很大(6 GB),我想利用multiprocessing
包来加速,下面是一个玩具示例,考虑到我的写作技巧和任务的复杂性,我将简要描述我想要实现的目标,并列出代码的细节。
df
,我想从其中执行一些逐行分析(顺序并不重要),它不仅需要焦点行本身,还需要满足特定条件的其他行。以下是玩具数据和我的代码,
import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math
# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),
'district': (['upper'] * 5 + ['down'] * 5) * 3,
'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})
df['row_id'] = df.index
print(df)
value district region row_id
0 8 upper A 0
1 4 upper A 1
2 0 upper A 2
3 3 upper A 3
4 0 upper A 4
5 0 down A 5
6 3 down A 6
7 7 down A 7
8 1 down A 8
9 7 down A 9
10 7 upper B 10
11 3 upper B 11
12 9 upper B 12
13 8 upper B 13
14 2 upper B 14
15 4 down B 15
16 5 down B 16
17 3 down B 17
18 5 down B 18
19 3 down B 19
20 3 upper C 20
21 1 upper C 21
22 3 upper C 22
23 0 upper C 23
24 3 upper C 24
25 2 down C 25
26 0 down C 26
27 1 down C 27
28 1 down C 28
29 0 down C 29
我想要做的是添加另外两个列<[2-4]和count_a
,它们只计算落在相同的<[2-6]和district
子集内的范围(Value-2,Value)和(Value,Value+2)的行数,例如,
第row_id==0
行的count_b
应为0,因为region=='A'
和district == 'upper'
中没有值7,这两行都属于(8-2,8)。因此,所需的输出应为:
count_a count_b region row_id
0 0 0 A 0
1 0 1 A 1
2 0 0 A 2
3 1 0 A 3
4 0 0 A 4
5 1 0 A 5
6 0 0 A 6
7 0 0 A 7
8 0 1 A 8
9 0 0 A 9
10 1 0 B 10
11 0 1 B 11
12 0 1 B 12
13 1 1 B 13
14 1 0 B 14
15 2 2 B 15
16 0 1 B 16
17 1 0 B 17
18 0 1 B 18
19 1 0 B 19
20 0 0 C 20
21 0 1 C 21
22 0 0 C 22
23 1 0 C 23
24 0 0 C 24
25 0 2 C 25
26 2 0 C 26
27 1 2 C 27
28 1 2 C 28
29 2 0 C 29
问题1:此类任务是否可以矢量化?
问题2:如何使用multiprocessing
加快速度(已解决)?
我决定使用multiprocessing
,因为我不确定如何通过矢量化来实现这一点。解决方案是(根据提供的答案)
多进程
def b_a(input_df,r_d):
print('length of input dataframe: ' + str(len(input_df)))
# print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))
sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]
print('length of sliced dataframe: ' + str(len(sub_df)))
print(r_d[0],r_d[1])
b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])
for id in sub_df['row_id']:
print('processing row: ' + str(id))
focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
temp_b = sub_df.loc[
(sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
temp_a = sub_df.loc[
(sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]
if len(temp_a):
temp_a['count_a'] = temp_a['row_id'].count()
else:
temp_a = temp_a.append(pd.Series(), ignore_index=True)
temp_a = temp_a.reindex(
columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)
print(temp_a)
if len(temp_b):
temp_b['count_b'] = temp_b['row_id'].count()
else:
temp_b = temp_b.append(pd.Series(), ignore_index=True)
temp_b = temp_b.reindex(
columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)
print(len(temp_a),len(temp_b))
temp_b.drop_duplicates('count_b', inplace=True)
temp_a.drop_duplicates('count_a', inplace=True)
temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
temp_a[['count_a']].reset_index(drop=True)], axis=1)
temp['row_id'] = id
temp['region'] = str(r_d[0])
b_a = pd.concat([b_a, temp])
return b_a
r_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))
if __name__ == '__main__':
P = Pool(3)
out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],
list(itertools.chain.from_iterable(r_d_list)))) # S3
# out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2
# out = P.starmap(b_a,zip(df,r_d_list)) # S1
# print(out)
P.close()
P.join()
final = pd.concat(out, ignore_index=True)
print(final)
final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))
由于使用P.starmap
(以及P.map
)需要向函数提供b_a
的所有可能的对参数,因此解决方案S1
不起作用,因为zip(df,r_d_list)
实际上会在df
的列名和r_d_list
中的元素之间产生压缩,这将导致错误AttributeError: 'str' object has no attribute 'loc'
,因为函数b_a
的input_df
字面上是一个字符串(列名df),可以通过查看print('length of input dataframe: ' + str(len(input_df)))
的输出进行验证,该输出将生成input_df
(在本例中为df
)的列名长度。可接受的答案通过创建与参数列表(r_d_list
)具有相同长度的引用数组(S2
)(不确定具体是什么)来纠正这一点。这个解决方案很有效,但当df
较大时可能会很慢,因为根据我个人的理解,它需要搜索每个参数对(region
和distrcit
)的整个数据帧,所以我提出了一个修改的版本,根据region
和distrcit
将数据分成块,然后在每个块内搜索,而不是在整个数据帧内搜索(S3)。对我来说,这个解决方案在运行时间方面将性能提高了20%,代码见下文:
region = df['region'].unique()
chunk_numbers = 3
chunk_region = math.ceil(len(region) / chunk_numbers)
chunks = list()
r_d_list = list()
row_count = 0
for i in range(chunk_numbers):
print(i)
if i < chunk_numbers-1:
regions = region[(i*chunk_region):((i+1)*chunk_region)]
temp = df.loc[df['region'].isin(regions.tolist())]
chunks.append(temp)
r_d_list.append(list(itertools.product(regions,temp['district'].unique())))
del temp
else:
regions = region[(i * chunk_region):len(region)]
temp = df.loc[df['region'].isin(regions.tolist())]
chunks.append(temp)
r_d_list.append(list(itertools.product(regions,temp['district'].unique())))
del temp
row_count = row_count + len(chunks[i])
print(row_count)
添加到print(df)
和def b_a()
之间,并记住注释掉if __name__ == '__main__'
之前的r_d_list = ...
。
感谢这个精彩的社区,我现在有了一个可行的解决方案,我更新了我的问题,为将来可能遇到同样问题的人提供一些材料,并更好地制定问题以获得更好的解决方案。
解决方案
我认为这里还有改进的空间。我建议您在groupby
import os
import pandas as pd
import numpy as np
import dask.dataframe as dd
N = 30_000
# Now the example is reproducible
np.random.seed(0)
df = pd.DataFrame({'value': np.random.randint(0, 10, size=N),
'district': (['upper'] * 5 + ['down'] * 5) * 3000,
'region': ['A'] * 10_000 + ['B'] * 10_000 + ['C'] * 10_000,
'row_id': np.arange(N)})
以下函数为给定组中的每一行返回count_a
和count_b
def fun(vec):
out = []
for i, v in enumerate(vec):
a = vec[:i] + vec[i+1:]
count_a = np.isin(a, [v-2, 2]).sum()
count_b = np.isin(a, [v, v+2]).sum()
out.append([count_a, count_b])
return out
pandas
%%time
df[["count_a", "count_b"]] = df.groupby(["district", "region"])["value"]
.apply(lambda x: fun(x))
.explode().apply(pd.Series)
.reset_index(drop=True)
CPU times: user 22.6 s, sys: 174 ms, total: 22.8 s
Wall time: 22.8 s
任务
现在您需要重新创建df
,然后可以使用dask
。这是我想到的第一件事。当然有更好/更快的方法。
ddf = dd.from_pandas(df, npartitions=os.cpu_count())
df[["count_a", "count_b"]] = ddf.groupby(["district", "region"])["value"]
.apply(lambda x: fun(x.tolist()),
meta=('x', 'f8'))
.compute(scheduler='processes')
.explode().apply(pd.Series)
.reset_index(drop=True)
CPU times: user 6.92 s, sys: 114 ms, total: 7.04 s
Wall time: 13.4 s
多处理
在这种情况下,您还需要创建df
。这里的诀窍是将df
拆分为df
的df
列表。
import multiprocessing as mp
def parallelize(fun, vec, cores):
with mp.Pool(cores) as p:
res = p.map(fun, vec)
return res
def par_fun(d):
d = d.reset_index(drop=True)
o = pd.DataFrame(fun(d["value"].tolist()),
columns=["count_a", "count_b"])
return pd.concat([d,o], axis=1)
%%time
lst = [l[1] for l in list(df.groupby(["district", "region"]))]
out = parallelize(par_fun, lst, os.cpu_count())
out = pd.concat(out, ignore_index=True)
CPU times: user 152 ms, sys: 49.7 ms, total: 202 ms
Wall time: 5 s
最终,您可以使用numba
改进您的功能。
相关文章