multiprocessing pool.map 按特定顺序调用函数

问题描述

如何让 multiprocessing.pool.map 按数字顺序分配进程?

How can I make multiprocessing.pool.map distribute processes in numerical order?

更多信息:
我有一个程序可以处理几千个数据文件,为每个文件绘制一个图.我正在使用 multiprocessing.pool.map 将每个文件分发到处理器,并且效果很好.有时这需要很长时间,在程序运行时查看输出图像会很好.如果 map 进程按顺序分发快照,这会容易得多;相反,对于我刚刚执行的特定运行,分析的前 8 个快照是:0、78、156、234、312、390、468、546.有没有办法让它按数字顺序更紧密地分布它们?

More Info:
I have a program which processes a few thousand data files, making a plot of each one. I'm using a multiprocessing.pool.map to distribute each file to a processor and it works great. Sometimes this takes a long time, and it would be nice to look at the output images as the program is running. This would be a lot easier if the map process distributed the snapshots in order; instead, for the particular run I just executed, the first 8 snapshots analyzed were: 0, 78, 156, 234, 312, 390, 468, 546. Is there a way to make it distribute them more closely to in numerical order?

示例:
这是一个包含相同关键元素的示例代码,并显示相同的基本结果:

Example:
Here's a sample code which contains the same key elements, and show's the same basic result:

import sys
from multiprocessing import Pool
import time

num_proc  = 4; num_calls = 20; sleeper   = 0.1

def SomeFunc(arg):
    time.sleep(sleeper)
    print "%5d" % (arg),
    sys.stdout.flush()     # otherwise doesn't print properly on single line

proc_pool = Pool(num_proc)
proc_pool.map( SomeFunc, range(num_calls) )

产量:

   0  4  2  6   1   5   3   7   8  10  12  14  13  11   9  15  16  18  17  19

<小时>

答案:

来自@Hayden:使用chunksize"参数,def map(self, func, iterable, chunksize=None).

更多信息:
chunksize 决定了每次分配给每个处理器的迭代次数.例如,我上面的示例使用了 2 的块大小——这意味着每个处理器关闭并在函数的 2 次迭代中执行其操作,然后返回更多(签入").chunksize 背后的权衡是,当处理器必须与其他处理器同步时,签入"会产生开销——这表明你想要一个 large chunksize.另一方面,如果你有大块,那么一个处理器可能会完成它的块,而另一个处理器还有很长的时间要走——所以你应该使用 small chunksize.我想额外的有用信息是有多少范围,每个函数调用可以花费多长时间.如果它们真的都应该花费相同的时间 - 使用大块大小会更有效.另一方面,如果某些函数调用的时间可能是其他函数的两倍,那么您需要一个较小的块大小,这样处理器就不会等待.

More Info:
The chunksize determines how many iterations are allocated to each processor at a time. My example above, for instance, uses a chunksize of 2---which means that each processor goes off and does its thing for 2 iterations of the function, then comes back for more ('check-in'). The trade-off behind chunksize is that there is overhead for the 'check-in' when the processor has to sync up with the others---suggesting you want a large chunksize. On the other hand, if you have large chunks, then one processor might finish its chunk while another-one has a long time left to go---so you should use a small chunksize. I guess the additional useful information is how much range there is, in how long each function call can take. If they really should all take the same amount of time - it's way more efficient to use a large chunk size. On the other hand, if some function calls could take twice as long as others, you want a small chunksize so that processors aren't caught waiting.

对于我的问题,每个函数调用都应该花费非常接近相同的时间(我认为),所以如果我希望按顺序调用进程,我会因为签入而牺牲效率开销.

For my problem, every function call should take very close to the same amount of time (I think), so if I want the processes to be called in order, I'm going to sacrifice efficiency because of the check-in overhead.


解决方案

发生这种情况的原因是因为每个进程在调用 map 的开始时都被赋予了预定义的工作量,这取决于 块大小.我们可以通过查看 chunksize">pool.map

The reason that this occurs is because each process is given a predefined amount of work to do at the start of the call to map which is dependant on the chunksize. We can work out the default chunksize by looking at the source for pool.map

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
  chunksize += 1

因此,对于 20 个范围和 4 个进程,我们将获得 2 个的 chunksize.

So for a range of 20, and with 4 processes, we will get a chunksize of 2.

如果我们修改您的代码以反映这一点,我们应该会得到与您现在得到的结果相似的结果:

If we modify your code to reflect this we should get similar results to the results you are getting now:

proc_pool.map(SomeFunc, range(num_calls), chunksize=2)

这会产生输出:

0 2 6 4 1 7 5 3 8 10 12 14 9 13 15 11 16 18 17 19

现在,设置 chunksize=1 将确保池中的每个进程一次只分配一个任务.

Now, setting the chunksize=1 will ensure that each process within the pool will only be given one task at a time.

proc_pool.map(SomeFunc, range(num_calls), chunksize=1)

与未指定块大小时相比,这应该确保相当好的数字排序.例如,块大小为 1 会产生输出:

This should ensure a reasonably good numerical ordering compared to that when not specifying a chunksize. For example a chunksize of 1 yields the output:

0 1 2 3 4 5 6 7 9 10 8 11 13 12 15 14 16 17 19 18

相关文章