多处理:了解“块大小"背后的逻辑
问题描述
哪些因素决定了 multiprocessing.Pool.map()
等方法的最佳 chunksize
参数?.map()
方法似乎对其默认块大小使用任意启发式(如下所述);是什么激发了这种选择?是否有基于某些特定情况/设置的更周到的方法?
例如 - 说我是:
- 将
iterable
传递给具有约 1500 万个元素的.map()
; - 在具有 24 个内核并使用默认值的机器上工作
该图显示了对
pool.map()
的示例调用,沿一行代码显示,取自multiprocessing.pool.worker
函数,其中一个任务从inqueue
读取的内容被解包.worker
是 pool-worker-process 的MainThread
中的底层主函数.在 pool-method 中指定的func
-argument 将只匹配worker
-function 中的func
-变量,用于像apply_async
和imap
与chunksize=1
.对于具有chunksize
参数的池方法的其余部分,处理函数func
将是映射器函数(mapstar
或starmapstar
).此函数将用户指定的func
参数映射到传输的可迭代块(-->map-tasks")的每个元素上.所花费的时间将任务定义为工作单元.
塔塞尔虽然 whole 处理一个块的任务"一词的使用与
<块引用>multiprocessing.pool
中的代码相匹配,但没有迹象表明 >单次调用到用户指定的func
,其中一个作为参数的块元素,应该被引用.为了避免命名冲突引起的混淆(想想 Pool 的__init__
方法的maxtasksperchild
-参数),这个答案将参考任务中的单个工作单元为 taskel.taskel(来自task + element)是task 中最小的工作单元.它是由
Pool
方法的func
参数指定的函数的单次执行,使用从 单个元素 获得的参数调用传输的块.task 由chunksize
taskels 组成.
并行化开销 (PO)PO 由 Python 内部开销和进程间通信 (IPC) 开销组成.Python 中的每个任务开销伴随着打包和解包任务及其结果所需的代码.IPC 开销伴随着必要的线程同步和不同地址空间之间的数据复制(需要两个复制步骤:父 -> 队列 -> 子).IPC 开销的数量取决于操作系统、硬件和数据大小,这使得很难对影响进行概括.
<小时>2.并行化目标
在使用多处理时,我们的总体目标(显然)是最小化所有任务的总处理时间.为了实现这一总体目标,我们的技术目标需要优化硬件资源的利用率.
实现技术目标的一些重要子目标是:
- 最大限度地减少并行化开销(最著名,但并不孤单:
记住 chunksize
cs_pool1
仍然缺少extra
调整,而divmod
的其余部分包含在cs_pool2
中完整的算法.算法继续:
如果额外:块大小 += 1
现在,如果存在 余数(divmod 操作中的
extra
),将块大小增加 1 显然不能适用于每个任务.毕竟,如果有的话,就没有余数了.从下图中可以看出,额外处理"的效果是,
rf_pool2
现在的真实因素从 below4
向4
收敛,偏差稍微平滑一些.n_workers=4
和len_iterable=500
的标准偏差从rf_pool1
的0.5233
下降到0.4115
rf_pool2
的代码>.最终,将
chunksize
增加 1 的效果是,最后传输的任务的大小仅为len_iterable % chunksize 或 chunksize
.额外处理的效果更有趣,我们稍后会看到,更重要的效果可以观察到生成的块数(
n_chunks
).对于足够长的迭代,Pool 完成的 chunksize-algorithm(下图中的n_pool2
)会将 chunk 的数量稳定在n_chunks == n_workers * 4
.相比之下,朴素算法(在初始 burp 之后)随着迭代长度的增长不断在n_chunks == n_workers
和n_chunks == n_workers + 1
之间交替.p>您将在下面找到两个增强的信息功能池和天真的块大小算法.下一章会用到这些函数的输出.
# mp_utils.py从集合导入命名元组块信息 = 命名元组('Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks','块大小','last_chunk'])def calc_chunksize_info(n_workers, len_iterable, 因子=4):"""计算块大小数."""块大小,额外 = divmod(len_iterable,n_workers * 因子)如果额外:块大小 += 1# `+ (len_iterable % chunksize > 0)` 利用 `True == 1`n_chunks = len_iterable//块大小 + (len_iterable % chunksize > 0)# 利用 `0 == False`last_chunk = len_iterable % 块大小或块大小返回块信息(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
不要被
calc_naive_chunksize_info
可能出乎意料的外观所迷惑.divmod
中的extra
不用于计算块大小.def calc_naive_chunksize_info(n_workers, len_iterable):"""计算简单的块大小数字."""块大小,额外 = divmod(len_iterable,n_workers)如果块大小 == 0:块大小 = 1n_chunks = 额外的last_chunk = 块大小别的:n_chunks = len_iterable//块大小 + (len_iterable % chunksize > 0)last_chunk = len_iterable % 块大小或块大小返回块信息(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
<小时>
6.量化算法效率
现在,在我们看到
Pool
的 chunksize-algorithm 的输出与朴素算法的输出相比看起来有何不同...- 如何判断 Pool 的方法是否真的改进了什么?
- 这个东西究竟是什么?
如上一章所示,对于更长的可迭代对象(更大数量的任务),Pool 的块大小算法大约将可迭代对象分成比天真的方法.更小的块意味着更多的任务,更多的任务意味着更多的并行化开销(PO),必须权衡增加调度灵活性的好处(回忆块大小的风险>1").
由于相当明显的原因,Pool 的基本块大小算法无法为我们权衡调度灵活性与 PO.IPC 开销取决于操作系统、硬件和数据大小.该算法不知道我们在什么硬件上运行我们的代码,也不知道任务需要多长时间才能完成.这是一种为所有可能的场景提供基本功能的启发式方法.这意味着它无法针对任何特定场景进行优化.如前所述,随着每个任务的计算时间增加(负相关),PO 也变得越来越不受关注.
当您回想起第 2 章中的并行化目标时,其中一个要点是:
- 所有 CPU 核心的高利用率
前面提到的东西,Pool的chunksize-algorithm可以尝试改进的是最小化idling worker-processes,分别是cpu 核心的使用率.
一个关于
multiprocessing.Pool
的重复性问题是人们想知道在您期望所有工作进程都忙的情况下未使用的内核/空闲工作进程的问题.虽然这可能有很多原因,但在计算结束时空闲的工作进程是我们经常可以观察到的,即使在 密集场景(每个任务的计算时间相等)的情况下,工人的数量不是块数的除数 (n_chunks % n_workers > 0
).现在的问题是:
<块引用>我们如何才能将我们对块大小的理解转化为能够解释观察到的工作人员利用率的东西,甚至在这方面比较不同算法的效率?
<小时>
6.1 模型
为了在这里获得更深入的见解,我们需要一种并行计算的抽象形式,它将过于复杂的现实简化为可管理的复杂程度,同时在定义的边界内保持重要性.这样的抽象称为模型.如果要收集数据,这种并行化模型"(PM) 的实现会像实际计算一样生成工作映射元数据(时间戳).模型生成的元数据允许在某些约束下预测并行计算的指标.
此处定义的 PM 中的两个子模型之一是 Distribution Model (DM).DM 解释了原子工作单元(taskels)如何分布在 并行工作人员和时间上,除了相应的块大小算法、工作人员数量、输入可迭代(任务的数量)及其计算持续时间被考虑在内.这意味着不包括任何形式的开销.
为了得到一个完整的PM,DM扩展了一个Overhead Model (OM),代表了各种形式的并行化开销 (PO).这样的模型需要为每个节点单独校准(硬件、操作系统依赖).OM 中表示的开销形式有多少是开放的,因此可以存在具有不同复杂程度的多个 OM.实现的 OM 需要哪个准确度级别由特定计算的 PO 的整体权重决定.更短的任务导致 PO 的权重更高,如果我们试图预测 并行化效率,这反过来需要更精确的 OM(PE).
<小时>6.2 并行调度 (PS)
Parallel Schedule 是并行计算的二维表示,其中 x 轴表示时间,y 轴表示并行工作池.工作人员的数量和总计算时间标志着一个矩形的延伸,其中绘制了较小的矩形.这些较小的矩形代表原子工作单元(taskels).
您可以在下面找到 PS 的可视化效果,该图像使用来自 Dense Scenario 的 Pool 块大小算法的 DM 的数据..p>
- x 轴被划分为相等的时间单位,其中每个单位代表 taskel 所需的计算时间.
- y 轴划分为池使用的工作进程数.
- 此处的任务显示为最小的青色矩形,放入匿名工作进程的时间线(时间表)中.
- 任务是工作时间线中的一个或多个任务,以相同的色调连续突出显示.
- 空闲时间单位通过红色瓷砖表示.
- 并行计划分为多个部分.最后一段是尾段.
组成部分的名称如下图所示.
在一个包含OM的完整PM中,Idling Share不仅限于尾部,还包括任务之间的空间,甚至任务之间.
<小时>6.3 效率
上面介绍的模型允许量化工人利用率.我们可以区分:
- 分配效率 (DE) - 借助 DM(或密集场景的简化方法)计算.
- 并行化效率 (PE) - 借助校准的 PM(预测)计算或根据实际计算的元数据计算.
请务必注意,对于给定的并行化问题,计算出的效率不会自动与更快的整体计算相关.在这种情况下,工人利用率仅区分具有已开始但未完成的任务的工人和没有这种开放"任务的工人.这意味着,可能的闲置在任务的时间跨度未注册.
上述所有效率基本上都是通过计算除法的商得到的Busy Share/Parallel Schedule.DE 和 PE 之间的区别在于 Busy Share对于开销扩展的PM,占用整个并行计划的较小部分.
这个答案将进一步讨论一种简单的方法来计算密集场景的 DE.这足以比较不同的块大小算法,因为...
- ... DM 是 PM 的一部分,它会随着所采用的不同块大小算法而变化.
- ... 密集场景,每个任务的计算持续时间相等,描述了一个稳定状态",这些时间跨度不属于等式.任何其他情况都只会导致随机结果,因为任务的顺序很重要.
<小时>
6.3.1 绝对分配效率 (ADE)
这个基本效率通常可以通过将忙碌份额除以并行计划的全部潜力来计算:
<块引用>绝对分配效率 (ADE) = Busy Share/Parallel Schedule
对于密集场景,简化的计算代码如下所示:
# mp_utils.pydef calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):"""计算绝对分配效率 (ADE).`len_iterable` 未使用,但包含以保持一致的签名与`calc_rde`."""如果 n_workers == 1:返回 1潜力 = (((n_chunks//n_workers + (n_chunks % n_workers > 1)) * chunksize)+ (n_chunks % n_workers == 1) * last_chunk) * n_workersn_full_chunks = n_chunks - (chunksize > last_chunk)taskels_in_regular_chunks = n_full_chunks * 块大小真实 = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunkade = 真实/潜在回报
如果没有空闲共享,忙碌共享将等于并行调度,因此我们得到ADE 为 100%.在我们的简化模型中,这是一个所有可用进程在处理所有任务所需的全部时间内都处于忙碌状态的场景.换句话说,整个作业被有效地并行化到 100%.
但为什么我在这里一直把 PE 称为 absolute PE?
为了理解这一点,我们必须考虑一种可能的块大小 (cs) 情况,以确保最大的调度灵活性(还有可能存在的 Highlander 的数量.巧合?):
<块引用>__________________________________~ 一个 ~__________________________________
If we, for example, have four worker-processes and 37 taskels, there will be idling workers even with
chunksize=1
, just becausen_workers=4
is not a divisor of 37. The remainder of dividing 37/4 is 1. This single remaining taskel will have to be processed by a sole worker, while the remaining three are idling.Likewise, there will still be one idling worker with 39 taskels, how you can see pictured below.
When you compare the upper Parallel Schedule for
<块引用>chunksize=1
with the below version forchunksize=3
, you will notice that the upper Parallel Schedule is smaller, the timeline on the x-axis shorter. It should become obvious now, how bigger chunksizes unexpectedly also can lead to increased overall computation times, even for Dense Scenarios.But why not just use the length of the x-axis for efficiency calculations?
Because the overhead is not contained in this model. It will be different for both chunksizes, hence the x-axis is not really directly comparable. The overhead can still lead to a longer total computation time like shown in case 2 from the figure below.
<小时>6.3.2 Relative Distribution Efficiency (RDE)
The ADE value does not contain the information if a better distribution of taskels is possible with chunksize set to 1. Better here still means a smaller Idling Share.
To get a DE value adjusted for the maximum possible DE, we have to divide the considered ADE through the ADE we get for
<块引用>chunksize=1
.Relative Distribution Efficiency (RDE) = ADE_cs_x/ADE_cs_1
Here is how this looks in code:
# mp_utils.pydef calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):"""Calculate Relative Distribution Efficiency (RDE)."""ade_cs1 = calc_ade(n_workers, len_iterable, n_chunks=len_iterable,chunksize=1, last_chunk=1)ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)rde = ade/ade_cs1return rde
RDE, how defined here, in essence is a tale about the tail of a Parallel Schedule. RDE is influenced by the maximum effective chunksize contained in the tail. (This tail can be of x-axis length
chunksize
orlast_chunk
.)This has the consequence, that RDE naturally converges to 100% (even) for all sorts of "tail-looks" like shown in the figure below.A low RDE ...
- is a strong hint for optimization potential.
- naturally gets less likely for longer iterables, because the relative tail-portion of the overall Parallel Schedule shrinks.
Please find Part II of this answer here.
What factors determine an optimal
chunksize
argument to methods likemultiprocessing.Pool.map()
? The.map()
method seems to use an arbitrary heuristic for its default chunksize (explained below); what motivates that choice and is there a more thoughtful approach based on some particular situation/setup?Example - say that I am:
- Passing an
iterable
to.map()
that has ~15 million elements; - Working on a machine with 24 cores and using the default
processes = os.cpu_count()
withinmultiprocessing.Pool()
.
My naive thinking is to give each of 24 workers an equally-sized chunk, i.e.
15_000_000 / 24
or 625,000. Large chunks should reduce turnover/overhead while fully utilizing all workers. But it seems that this is missing some potential downsides of giving large batches to each worker. Is this an incomplete picture, and what am I missing?
Part of my question stems from the default logic for if
chunksize=None
: both.map()
and.starmap()
call.map_async()
, which looks like this:def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, error_callback=None): # ... (materialize `iterable` to list if it's an iterator) if chunksize is None: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) # ???? if extra: chunksize += 1 if len(iterable) == 0: chunksize = 0
What's the logic behind
divmod(len(iterable), len(self._pool) * 4)
? This implies that the chunksize will be closer to15_000_000 / (24 * 4) == 156_250
. What's the intention in multiplyinglen(self._pool)
by 4?This makes the resulting chunksize a factor of 4 smaller than my "naive logic" from above, which consists of just dividing the length of the iterable by number of workers in
pool._pool
.Lastly, there is also this snippet from the Python docs on
.imap()
that further drives my curiosity:The
chunksize
argument is the same as the one used by themap()
method. For very long iterables using a large value forchunksize
can make the job complete much faster than using the default value of 1.
Related answer that is helpful but a bit too high-level: Python multiprocessing: why are large chunksizes slower?.
解决方案Short Answer
Pool's chunksize-algorithm is a heuristic. It provides a simple solution for all imaginable problem scenarios you are trying to stuff into Pool's methods. As a consequence, it cannot be optimized for any specific scenario.
The algorithm arbitrarily divides the iterable in approximately four times more chunks than the naive approach. More chunks mean more overhead, but increased scheduling flexibility. How this answer will show, this leads to a higher worker-utilization on average, but without the guarantee of a shorter overall computation time for every case.
"That's nice to know" you might think, "but how does knowing this help me with my concrete multiprocessing problems?" Well, it doesn't. The more honest short answer is, "there is no short answer", "multiprocessing is complex" and "it depends". An observed symptom can have different roots, even for similar scenarios.
This answer tries to provide you with basic concepts helping you to get a clearer picture of Pool's scheduling black box. It also tries to give you some basic tools at hand for recognizing and avoiding potential cliffs as far they are related to chunksize.
Table of Contents
Part I
- Definitions
- Parallelization Goals
- Parallelization Scenarios
- Risks of Chunksize > 1
- Pool's Chunksize-Algorithm
Quantifying Algorithm Efficiency
6.1 Models
6.2 Parallel Schedule
6.3 Efficiencies
6.3.1 Absolute Distribution Efficiency (ADE)
6.3.2 Relative Distribution Efficiency (RDE)
Part II
- Naive vs. Pool's Chunksize-Algorithm
- Reality Check
- Conclusion
It is necessary to clarify some important terms first.
1. Definitions
ChunkA chunk here is a share of the
iterable
-argument specified in a pool-method call. How the chunksize gets calculated and what effects this can have, is the topic of this answer.
TaskA task's physical representation in a worker-process in terms of data can be seen in the figure below.
The figure shows an example call to
pool.map()
, displayed along a line of code, taken from themultiprocessing.pool.worker
function, where a task read from theinqueue
gets unpacked.worker
is the underlying main-function in theMainThread
of a pool-worker-process. Thefunc
-argument specified in the pool-method will only match thefunc
-variable inside theworker
-function for single-call methods likeapply_async
and forimap
withchunksize=1
. For the rest of the pool-methods with achunksize
-parameter the processing-functionfunc
will be a mapper-function (mapstar
orstarmapstar
). This function maps the user-specifiedfunc
-parameter on every element of the transmitted chunk of the iterable (--> "map-tasks"). The time this takes, defines a task also as a unit of work.
TaskelWhile the usage of the word "task" for the whole processing of one chunk is matched by code within
multiprocessing.pool
, there is no indication how a single call to the user-specifiedfunc
, with one element of the chunk as argument(s), should be referred to. To avoid confusion emerging from naming conflicts (think ofmaxtasksperchild
-parameter for Pool's__init__
-method), this answer will refer to the single units of work within a task as taskel.A taskel (from task + element) is the smallest unit of work within a task. It is the single execution of the function specified with the
func
-parameter of aPool
-method, called with arguments obtained from a single element of the transmitted chunk. A task consists ofchunksize
taskels.
Parallelization Overhead (PO)PO consists of Python-internal overhead and overhead for inter-process communication (IPC). The per-task overhead within Python comes with the code needed for packaging and unpacking the tasks and its results. IPC-overhead comes with the necessary synchronization of threads and the copying of data between different address spaces (two copy steps needed: parent -> queue -> child). The amount of IPC-overhead is OS-, hardware- and data-size dependent, what makes generalizations about the impact difficult.
2. Parallelization Goals
When using multiprocessing, our overall goal (obviously) is to minimize total processing time for all tasks. To reach this overall goal, our technical goal needs to be optimizing the utilization of hardware resources.
Some important sub-goals for achieving the technical goal are:
- minimize parallelization overhead (most famously, but not alone: IPC)
- high utilization across all cpu-cores
- keeping memory usage limited to prevent the OS from excessive paging (trashing)
At first, the tasks need to be computationally heavy (intensive) enough, to earn back the PO we have to pay for parallelization. The relevance of PO decreases with increasing absolute computation time per taskel. Or, to put it the other way around, the bigger the absolute computation time per taskel for your problem, the less relevant gets the need for reducing PO. If your computation will take hours per taskel, the IPC overhead will be negligible in comparison. The primary concern here is to prevent idling worker processes after all tasks have been distributed. Keeping all cores loaded means, we are parallelizing as much as possible.
3. Parallelization Scenarios
What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()
The major factor in question is how much computation time may vary across our single taskels. To name it, the choice for an optimal chunksize is determined by the Coefficient of Variation (CV) for computation times per taskel.
The two extreme scenarios on a scale, following from the extent of this variation are:
- All taskels need exactly the same computation time.
- A taskel could take seconds or days to finish.
For better memorability, I will refer to these scenarios as:
- Dense Scenario
- Wide Scenario
Dense Scenario
In a Dense Scenario it would be desirable to distribute all taskels at once, to keep necessary IPC and context switching at a minimum. This means we want to create only as much chunks, as much worker processes there are. How already stated above, the weight of PO increases with shorter computation times per taskel.
For maximal throughput, we also want all worker processes busy until all tasks are processed (no idling workers). For this goal, the distributed chunks should be of equal size or close to.
Wide Scenario
The prime example for a Wide Scenario would be an optimization problem, where results either converge quickly or computation can take hours, if not days. Usually it is not predictable what mixture of "light taskels" and "heavy taskels" a task will contain in such a case, hence it's not advisable to distribute too many taskels in a task-batch at once. Distributing less taskels at once than possible, means increasing scheduling flexibility. This is needed here to reach our sub-goal of high utilization of all cores.
If
Pool
methods, by default, would be totally optimized for the Dense Scenario, they would increasingly create suboptimal timings for every problem located closer to the Wide Scenario.
4. Risks of Chunksize > 1
Consider this simplified pseudo-code example of a Wide Scenario-iterable, which we want to pass into a pool-method:
good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
Instead of the actual values, we pretend to see the needed computation time in seconds, for simplicity only 1 minute or 1 day. We assume the pool has four worker processes (on four cores) and
chunksize
is set to2
. Because the order will be kept, the chunks send to the workers will be these:[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
Since we have enough workers and the computation time is high enough, we can say, that every worker process will get a chunk to work on in the first place. (This does not have to be the case for fast completing tasks). Further we can say, the whole processing will take about 86400+60 seconds, because that's the highest total computation time for a chunk in this artificial scenario and we distribute chunks only once.
Now consider this iterable, which has only one element switching its position compared to the previous iterable:
bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
...and the corresponding chunks:
[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
Just bad luck with the sorting of our iterable nearly doubled (86400+86400) our total processing time! The worker getting the vicious (86400, 86400)-chunk is blocking the second heavy taskel in its task from getting distributed to one of the idling workers already finished with their (60, 60)-chunks. We obviously would not risk such an unpleasant outcome if we set
chunksize=1
.This is the risk of bigger chunksizes. With higher chunksizes we trade scheduling flexibility for less overhead and in cases like above, that's a bad deal.
How we will see in chapter 6. Quantifying Algorithm Efficiency, bigger chunksizes can also lead to suboptimal results for Dense Scenarios.
5. Pool's Chunksize-Algorithm
Below you will find a slightly modified version of the algorithm inside the source code. As you can see, I cut off the lower part and wrapped it into a function for calculating the
chunksize
argument externally. I also replaced4
with afactor
parameter and outsourced thelen()
calls.# mp_utils.py def calc_chunksize(n_workers, len_iterable, factor=4): """Calculate chunksize argument for Pool-methods. Resembles source-code within `multiprocessing.pool.Pool._map_async`. """ chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 return chunksize
To ensure we are all on the same page, here's what
divmod
does:divmod(x, y)
is a builtin function which returns(x//y, x%y)
.x // y
is the floor division, returning the down rounded quotient fromx / y
, whilex % y
is the modulo operation returning the remainder fromx / y
. Hence e.g.divmod(10, 3)
returns(3, 1)
.Now when you look at
chunksize, extra = divmod(len_iterable, n_workers * 4)
, you will noticen_workers
here is the divisory
inx / y
and multiplication by4
, without further adjustment throughif extra: chunksize +=1
later on, leads to an initial chunksize at least four times smaller (forlen_iterable >= n_workers * 4
) than it would be otherwise.For viewing the effect of multiplication by
4
on the intermediate chunksize result consider this function:def compare_chunksizes(len_iterable, n_workers=4): """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize for Pool's complete algorithm. Return chunksizes and the real factors by which naive chunksizes are bigger. """ cs_naive = len_iterable // n_workers or 1 # naive approach cs_pool1 = len_iterable // (n_workers * 4) or 1 # incomplete pool algo. cs_pool2 = calc_chunksize(n_workers, len_iterable) real_factor_pool1 = cs_naive / cs_pool1 real_factor_pool2 = cs_naive / cs_pool2 return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
The function above calculates the naive chunksize (
cs_naive
) and the first-step chunksize of Pool's chunksize-algorithm (cs_pool1
), as well as the chunksize for the complete Pool-algorithm (cs_pool2
). Further it calculates the real factorsrf_pool1 = cs_naive / cs_pool1
andrf_pool2 = cs_naive / cs_pool2
, which tell us how many times the naively calculated chunksizes are bigger than Pool's internal version(s).Below you see two figures created with output from this function. The left figure just shows the chunksizes for
n_workers=4
up until an iterable length of500
. The right figure shows the values forrf_pool1
. For iterable length16
, the real factor becomes>=4
(forlen_iterable >= n_workers * 4
) and it's maximum value is7
for iterable lengths28-31
. That's a massive deviation from the original factor4
the algorithm converges to for longer iterables. 'Longer' here is relative and depends on the number of specified workers.Remember chunksize
cs_pool1
still lacks theextra
-adjustment with the remainder fromdivmod
contained incs_pool2
from the complete algorithm.The algorithm goes on with:
if extra: chunksize += 1
Now in cases were there is a remainder (an
extra
from the divmod-operation), increasing the chunksize by 1 obviously cannot work out for every task. After all, if it would, there would not be a remainder to begin with.How you can see in the figures below, the "extra-treatment" has the effect, that the real factor for
rf_pool2
now converges towards4
from below4
and the deviation is somewhat smoother. Standard deviation forn_workers=4
andlen_iterable=500
drops from0.5233
forrf_pool1
to0.4115
forrf_pool2
.Eventually, increasing
chunksize
by 1 has the effect, that the last task transmitted only has a size oflen_iterable % chunksize or chunksize
.The more interesting and how we will see later, more consequential, effect of the extra-treatment however can be observed for the number of generated chunks (
n_chunks
). For long enough iterables, Pool's completed chunksize-algorithm (n_pool2
in the figure below) will stabilize the number of chunks atn_chunks == n_workers * 4
. In contrast, the naive algorithm (after an initial burp) keeps alternating betweenn_chunks == n_workers
andn_chunks == n_workers + 1
as the length of the iterable grows.Below you will find two enhanced info-functions for Pool's and the naive chunksize-algorithm. The output of these functions will be needed in the next chapter.
# mp_utils.py from collections import namedtuple Chunkinfo = namedtuple( 'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks', 'chunksize', 'last_chunk'] ) def calc_chunksize_info(n_workers, len_iterable, factor=4): """Calculate chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 # `+ (len_iterable % chunksize > 0)` exploits that `True == 1` n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) # exploit `0 == False` last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
Don't be confused by the probably unexpected look of
calc_naive_chunksize_info
. Theextra
fromdivmod
is not used for calculating the chunksize.def calc_naive_chunksize_info(n_workers, len_iterable): """Calculate naive chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers) if chunksize == 0: chunksize = 1 n_chunks = extra last_chunk = chunksize else: n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
6. Quantifying Algorithm Efficiency
Now, after we have seen how the output of
Pool
's chunksize-algorithm looks different compared to output from the naive algorithm...- How to tell if Pool's approach actually improves something?
- And what exactly could this something be?
As shown in the previous chapter, for longer iterables (a bigger number of taskels), Pool's chunksize-algorithm approximately divides the iterable into four times more chunks than the naive method. Smaller chunks mean more tasks and more tasks mean more Parallelization Overhead (PO), a cost which must be weighed against the benefit of increased scheduling-flexibility (recall "Risks of Chunksize>1").
For rather obvious reasons, Pool's basic chunksize-algorithm cannot weigh scheduling-flexibility against PO for us. IPC-overhead is OS-, hardware- and data-size dependent. The algorithm cannot know on what hardware we run our code, nor does it have a clue how long a taskel will take to finish. It's a heuristic providing basic functionality for all possible scenarios. This means it cannot be optimized for any scenario in particular. As mentioned before, PO also becomes increasingly less of a concern with increasing computation times per taskel (negative correlation).
When you recall the Parallelization Goals from chapter 2, one bullet-point was:
- high utilization across all cpu-cores
The previously mentioned something, Pool's chunksize-algorithm can try to improve is the minimization of idling worker-processes, respectively the utilization of cpu-cores.
A repeating question on SO regarding
multiprocessing.Pool
is asked by people wondering about unused cores / idling worker-processes in situations where you would expect all worker-processes busy. While this can have many reasons, idling worker-processes towards the end of a computation are an observation we can often make, even with Dense Scenarios (equal computation times per taskel) in cases where the number of workers is not a divisor of the number of chunks (n_chunks % n_workers > 0
).The question now is:
How can we practically translate our understanding of chunksizes into something which enables us to explain observed worker-utilization, or even compare the efficiency of different algorithms in that regard?
6.1 Models
For gaining deeper insights here, we need a form of abstraction of parallel computations which simplifies the overly complex reality down to a manageable degree of complexity, while preserving significance within defined boundaries. Such an abstraction is called a model. An implementation of such a "Parallelization Model" (PM) generates worker-mapped meta-data (timestamps) as real computations would, if the data were to be collected. The model-generated meta-data allows predicting metrics of parallel computations under certain constraints.
One of two sub-models within the here defined PM is the Distribution Model (DM). The DM explains how atomic units of work (taskels) are distributed over parallel workers and time, when no other factors than the respective chunksize-algorithm, the number of workers, the input-iterable (number of taskels) and their computation duration is considered. This means any form of overhead is not included.
For obtaining a complete PM, the DM is extended with an Overhead Model (OM), representing various forms of Parallelization Overhead (PO). Such a model needs to be calibrated for each node individually (hardware-, OS-dependencies). How many forms of overhead are represented in a OM is left open and so multiple OMs with varying degrees of complexity can exist. Which level of accuracy the implemented OM needs is determined by the overall weight of PO for the specific computation. Shorter taskels lead to a higher weight of PO, which in turn requires a more precise OM if we were attempting to predict Parallelization Efficiencies (PE).
6.2 Parallel Schedule (PS)
The Parallel Schedule is a two-dimensional representation of the parallel computation, where the x-axis represents time and the y-axis represents a pool of parallel workers. The number of workers and the total computation time mark the extend of a rectangle, in which smaller rectangles are drawn in. These smaller rectangles represent atomic units of work (taskels).
Below you find the visualization of a PS drawn with data from the DM of Pool's chunksize-algorithm for the Dense Scenario.
- The x-axis is sectioned into equal units of time, where each unit stands for the computation time a taskel requires.
- The y-axis is divided into the number of worker-processes the pool uses.
- A taskel here is displayed as the smallest cyan-colored rectangle, put into a timeline (a schedule) of an anonymized worker-process.
- A task is one or multiple taskels in a worker-timeline continuously highlighted with the same hue.
- Idling time units are represented through red colored tiles.
- The Parallel Schedule is partitioned into sections. The last section is the tail-section.
The names for the composed parts can be seen in the picture below.
In a complete PM including an OM, the Idling Share is not limited to the tail, but also comprises space between tasks and even between taskels.
6.3 Efficiencies
The Models introduced above allow quantifying the rate of worker-utilization. We can distinguish:
- Distribution Efficiency (DE) - calculated with help of a DM (or a simplified method for the Dense Scenario).
- Parallelization Efficiency (PE) - either calculated with help of a calibrated PM (prediction) or calculated from meta-data of real computations.
It's important to note, that calculated efficiencies do not automatically correlate with faster overall computation for a given parallelization problem. Worker-utilization in this context only distinguishes between a worker having a started, yet unfinished taskel and a worker not having such an "open" taskel. That means, possible idling during the time span of a taskel is not registered.
All above mentioned efficiencies are basically obtained by calculating the quotient of the division Busy Share / Parallel Schedule. The difference between DE and PE comes with the Busy Share occupying a smaller portion of the overall Parallel Schedule for the overhead-extended PM.
This answer will further only discuss a simple method to calculate DE for the Dense Scenario. This is sufficiently adequate to compare different chunksize-algorithms, since...
- ... the DM is the part of the PM, which changes with different chunksize-algorithms employed.
- ... the Dense Scenario with equal computation durations per taskel depicts a "stable state", for which these time spans drop out of the equation. Any other scenario would just lead to random results since the ordering of taskels would matter.
6.3.1 Absolute Distribution Efficiency (ADE)
This basic efficiency can be calculated in general by dividing the Busy Share through the whole potential of the Parallel Schedule:
Absolute Distribution Efficiency (ADE) = Busy Share / Parallel Schedule
For the Dense Scenario, the simplified calculation-code looks like this:
# mp_utils.py def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate Absolute Distribution Efficiency (ADE). `len_iterable` is not used, but contained to keep a consistent signature with `calc_rde`. """ if n_workers == 1: return 1 potential = ( ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize) + (n_chunks % n_workers == 1) * last_chunk ) * n_workers n_full_chunks = n_chunks - (chunksize > last_chunk) taskels_in_regular_chunks = n_full_chunks * chunksize real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk ade = real / potential return ade
If there is no Idling Share, Busy Share will be equal to Parallel Schedule, hence we get an ADE of 100%. In our simplified model, this is a scenario where all available processes will be busy through the whole time needed for processing all tasks. In other words, the whole job gets effectively parallelized to 100 percent.
But why do I keep referring to PE as absolute PE here?
To comprehend that, we have to consider a possible case for the chunksize (cs) which ensures maximal scheduling flexibility (also, the number of Highlanders there can be. Coincidence?):
__________________________________~ ONE ~__________________________________
If we, for example, have four worker-processes and 37 taskels, there will be idling workers even with
chunksize=1
, just becausen_workers=4
is not a divisor of 37. The remainder of dividing 37 / 4 is 1. This single remaining taskel will have to be processed by a sole worker, while the remaining three are idling.Likewise, there will still be one idling worker with 39 taskels, how you can see pictured below.
When you compare the upper Parallel Schedule for
chunksize=1
with the below version forchunksize=3
, you will notice that the upper Parallel Schedule is smaller, the timeline on the x-axis shorter. It should become obvious now, how bigger chunksizes unexpectedly also can lead to increased overall computation times, even for Dense Scenarios.But why not just use the length of the x-axis for efficiency calculations?
Because the overhead is not contained in this model. It will be different for both chunksizes, hence the x-axis is not really directly comparable. The overhead can still lead to a longer total computation time like shown in case 2 from the figure below.
6.3.2 Relative Distribution Efficiency (RDE)
The ADE value does not contain the information if a better distribution of taskels is possible with chunksize set to 1. Better here still means a smaller Idling Share.
To get a DE value adjusted for the maximum possible DE, we have to divide the considered ADE through the ADE we get for
chunksize=1
.Relative Distribution Efficiency (RDE) = ADE_cs_x / ADE_cs_1
Here is how this looks in code:
# mp_utils.py def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate Relative Distribution Efficiency (RDE).""" ade_cs1 = calc_ade( n_workers, len_iterable, n_chunks=len_iterable, chunksize=1, last_chunk=1 ) ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk) rde = ade / ade_cs1 return rde
RDE, how defined here, in essence is a tale about the tail of a Parallel Schedule. RDE is influenced by the maximum effective chunksize contained in the tail. (This tail can be of x-axis length
chunksize
orlast_chunk
.) This has the consequence, that RDE naturally converges to 100% (even) for all sorts of "tail-looks" like shown in the figure below.A low RDE ...
- is a strong hint for optimization potential.
- naturally gets less likely for longer iterables, because the relative tail-portion of the overall Parallel Schedule shrinks.
Please find Part II of this answer here.
- 最大限度地减少并行化开销(最著名,但并不孤单:
相关文章