如何在具有500万行和50万组的Dask数据帧上加速groupby().sum()?
问题描述
我有一个包含
的数据帧- 500万行。
- 唯一元素数为500.000的列
group_id
。 - 名为
var1
、var2
等的数千个其他列。var1
、var2
、.仅包含0和1。
我想按group_id
分组,然后将它们相加。为了有更好的性能,我使用Dask。但是,此简单聚合的速度仍然较慢。
The time spent on a dataframe with 10 columns is 6.285385847091675 seconds
The time spent on a dataframe with 100 columns is 64.9060411453247 seconds
The time spent on a dataframe with 200 columns is 150.6109869480133 seconds
The time spent on a dataframe with 300 columns is 235.77087807655334 seconds
我的实际数据集最多包含30.000列。我已经阅读了@Divakar关于使用numpy的答案(1和2)。但是,前一个线程是关于计数的,而后者是关于列求和的。
您能否详细说明加速此聚合的一些方法?
import numpy as np
import pandas as pd
import os, time
from multiprocessing import dummy
import dask.dataframe as dd
core = os.cpu_count()
P = dummy.Pool(processes = core)
n_docs = 500000
n_rows = n_docs * 10
data = {}
def create_col(i):
name = 'var' + str(i)
data[name] = np.random.randint(0, 2, n_rows)
n_cols = 300
P.map(create_col, range(1, n_cols + 1))
df = pd.DataFrame(data, dtype = 'int8')
df.insert(0, 'group_id', np.random.randint(1, n_docs + 1, n_rows))
df = dd.from_pandas(df, npartitions = 3 * core)
start = time.time()
df.groupby('group_id').sum().compute()
end = time.time()
print('The time spent on a dataframe with {} columns is'.format(n_cols), end - start, 'seconds')
解决方案
(我误解了原始答案中的OP,因此全部清除)。
我通过以下方式获得改进:
- 切换到Numpy
- 对组和数据使用相同的数据类型(np.int32)
- 以并行模式使用Numba"
import numba as nb
@nb.njit('int32[:, :](int32[:, :], int_)', parallel=True)
def count_groups2(group_and_data, n_groups):
n_cols = group_and_data.shape[1] - 1
counts = np.zeros((n_groups, n_cols), dtype=np.int32)
for idx in nb.prange(len(group_and_data)):
row = group_and_data[idx]
counts[row[0]] += row[1:]
return counts
df = pd.DataFrame(data, dtype='int32')
group_id = np.random.randint(1, n_docs + 1, n_rows, dtype=np.int32)
df.insert(0, 'group_id', group_id)
# switching to numpy (line below) is costly
# it would be faster to work with numpy alone (no pandas)
group_and_data = df.values
count_groups2(group_and_data)
op_method(df)
72 1 1439807.0 1439807.0 7.0 group_and_data = df.values
73 1 1341527.0 1341527.0 6.5 count_groups2(group_and_data, n_groups=500_000)
74 1 12043334.0 12043334.0 58.5 op_method(df)
相关文章