Pandas df.iterrows() 并行化
问题描述
我想并行化以下代码:
for row in df.iterrows():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
我尝试使用 multiprocessing.Pool()
因为每一行都可以独立处理,但我不知道如何共享 DataFrame.我也不确定这是与 pandas 进行并行化的最佳方法.有什么帮助吗?
I have tried to use multiprocessing.Pool()
since each row can be processed independently, but I can't figure out how to share the DataFrame. I am also not sure that this is the best approach to do parallelization with pandas. Any help?
解决方案
正如@Khris 在他的评论中所说,你应该将你的数据框分成几个大块并并行迭代每个块.您可以将数据帧任意拆分为随机大小的块,但根据您计划使用的进程数将数据帧分成大小相等的块更有意义.幸运的是,其他人 已经想出了如何做那部分 对我们来说:
As @Khris said in his comment, you should split up your dataframe into a few large chunks and iterate over each chunk in parallel. You could arbitrarily split the dataframe into randomly sized chunks, but it makes more sense to divide the dataframe into equally sized chunks based on the number of processes you plan on using. Luckily someone else has already figured out how to do that part for us:
# don't forget to import
import pandas as pd
import multiprocessing
# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()
# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)
# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.iloc[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]
这将创建一个列表,其中包含我们的数据框.现在我们需要将它与一个操作数据的函数一起传递到我们的池中.
This creates a list that contains our dataframe in chunks. Now we need to pass it into our pool along with a function that will manipulate the data.
def func(d):
# let's create a function that squares every value in the dataframe
return d * d
# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)
# apply our function to each chunk in the list
result = pool.map(func, chunks)
此时,result
将是一个列表,其中包含每个被操作后的块.在这种情况下,所有值都已平方.现在的问题是原始数据框尚未修改,因此我们必须将其所有现有值替换为我们池中的结果.
At this point, result
will be a list holding each chunk after it has been manipulated. In this case, all values have been squared. The issue now is that the original dataframe has not been modified, so we have to replace all of its existing values with the results from our pool.
for i in range(len(result)):
# since result[i] is just a dataframe
# we can reassign the original dataframe based on the index of each chunk
df.iloc[result[i].index] = result[i]
现在,我的数据帧操作函数已被矢量化,如果我只是将其应用于整个数据帧而不是拆分成块,可能会更快.但是,在您的情况下,您的函数将遍历每个块的每一行,然后返回该块.这允许您一次处理 num_process
行.
Now, my function to manipulate my dataframe is vectorized and would likely have been faster if I had simply applied it to the entirety of my dataframe instead of splitting into chunks. However, in your case, your function would iterate over each row of the each chunk and then return the chunk. This allows you to process num_process
rows at a time.
def func(d):
for row in d.iterrow():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
# return the chunk!
return d
然后您重新分配原始数据框中的值,并且您已成功并行化此过程.
Then you reassign the values in the original dataframe, and you have successfully parallelized this process.
您的最佳表现将取决于此问题的答案.而所有的过程!!!!"是一个答案,一个更好的答案更加细微.在某一点之后,在一个问题上投入更多的进程实际上会产生比其价值更多的开销.这被称为 阿姆达尔定律.同样,我们很幸运,其他人已经为我们解决了这个问题:
Your optimal performance is going to depend on the answer to this question. While "ALL OF THE PROCESSES!!!!" is one answer, a better answer is much more nuanced. After a certain point, throwing more processes at a problem actually creates more overhead than it's worth. This is known as Amdahl's Law. Again, we are fortunate that others have already tackled this question for us:
- Python 多处理的池进程限制
- 我应该并行运行多少个进程?
一个好的默认是使用 multiprocessing.cpu_count()
,这是 multiprocessing.Pool
的默认行为.根据文档 "如果 processes 是 None然后使用 cpu_count() 返回的数字."这就是为什么我在开始时将 num_processes
设置为 multiprocessing.cpu_count()
.这样,如果您迁移到更强大的机器,您就可以从中受益,而无需直接更改 num_processes
变量.
A good default is to use multiprocessing.cpu_count()
, which is the default behavior of multiprocessing.Pool
. According to the documentation "If processes is None then the number returned by cpu_count() is used." That's why I set num_processes
at the beginning to multiprocessing.cpu_count()
. This way, if you move to a beefier machine, you get the benefits from it without having to change the num_processes
variable directly.
相关文章