有效地将函数并行应用于分组的 pandas DataFrame
问题描述
我经常需要将一个函数应用到一个非常大的DataFrame
(混合数据类型)的组中,并希望利用多个内核.
I often need to apply a function to the groups of a very large DataFrame
(of mixed data types) and would like to take advantage of multiple cores.
我可以从组中创建一个迭代器并使用多处理模块,但效率不高,因为每个组和函数的结果都必须为进程之间的消息传递进行腌制.
I can create an iterator from the groups and use the multiprocessing module, but it is not efficient because every group and the results of the function must be pickled for messaging between processes.
有什么方法可以避免酸洗甚至完全避免 DataFrame
的复制?看起来多处理模块的共享内存功能仅限于 numpy
数组.还有其他选择吗?
Is there any way to avoid the pickling or even avoid the copying of the DataFrame
completely? It looks like the shared memory functions of the multiprocessing modules are limited to numpy
arrays. Are there any other options?
解决方案
从上面的评论看来,这似乎是为 pandas
计划的(还有一个看起来很有趣的 rosetta
项目 我刚刚注意到).
From the comments above, it seems that this is planned for pandas
some time (there's also an interesting-looking rosetta
project which I just noticed).
然而,在所有并行功能都被整合到 pandas
之前,我注意到编写高效的 & 是非常容易的.直接使用 cython
+ pandas 进行非内存复制并行扩充"http://www.google.co.il/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&ved=0CB0QFjAA&url=http%3A%2F%2Fwww.openmp.org%2F&ei=HKpdVfyVJcj8ULXHgcAF&usg=AFQjCNGlD5aZM8ZP3Qx7WXT74Y7C54jLNQ&bvm=bv.93756505,d.d24">OpenMP 和 C++.
However, until every parallel functionality is incorporated into pandas
, I noticed that it's very easy to write efficient & non-memory-copying parallel augmentations to pandas
directly using cython
+ OpenMP and C++.
这是一个编写并行 groupby-sum 的简短示例,其用法如下:
Here's a short example of writing a parallel groupby-sum, whose use is something like this:
import pandas as pd
import para_group_demo
df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)
输出是:
sum
key
0 6
1 11
2 4
<小时>
注意 毫无疑问,这个简单示例的功能最终将成为 pandas
的一部分.然而,有些事情在 C++ 中并行化一段时间会更自然,重要的是要知道将其组合到 pandas
中是多么容易.
Note Doubtlessly, this simple example's functionality will eventually be part of pandas
. Some things, however, will be more natural to parallelize in C++ for some time, and it's important to be aware of how easy it is to combine this into pandas
.
为此,我编写了一个简单的单源文件扩展名,其代码如下.
To do this, I wrote a simple single-source-file extension whose code follows.
从一些导入和类型定义开始
It starts with some imports and type definitions
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
C++的unordered_map
类型是单线程求和,vector
是所有线程求和.
The C++ unordered_map
type is for summing by a single thread, and the vector
is for summing by all threads.
现在到函数 sum
.它从 键入的内存视图 开始,以便快速访问:
Now to the function sum
. It starts off with typed memory views for fast access:
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
该函数继续将半等分到线程(这里硬编码为 4),并让每个线程将其范围内的条目相加:
The function continues by dividing the semi-equally to the threads (here hardcoded to 4), and having each thread sum the entries in its range:
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
当线程完成时,该函数将所有结果(来自不同范围)合并到单个 unordered_map
:
When the threads have completed, the function merges all the results (from the different ranges) into a single unordered_map
:
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
剩下的就是创建一个DataFrame
并返回结果:
All that's left is to create a DataFrame
and return the results:
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df
相关文章