在DASK中,有没有一种方法可以在依赖项变得可用时对其进行处理,就像在多进程.imap_unordered中一样?
问题描述
我有一个简单的图结构,它接受N个独立的任务,然后聚合它们。我不关心独立任务的结果是以什么顺序汇总的。是否有方法可以通过在依赖项可用时对其执行操作来加快计算速度?
请考虑以下示例。在其中,并行任务每个都等待一段随机时间,然后返回。另一个任务收集结果,形成有序队列。如果收集是异步进行的,则顺序将基于任务完成的时间。如果收集同步进行,则顺序将由输入静态定义。
from multiprocessing import Pool
from dask import delayed
import numpy as np
from time import sleep
def wait(i):
"""Something embarrassingly parallel"""
np.random.seed()
t = np.random.uniform()
sleep(t)
print(i, t)
return i, t
def lineup(who_when):
"""Aggregate"""
order = []
for who, when in who_when:
print(f'who: {who}')
order.append(who)
return order
使用IMAP_UNORDERED,我们看到收集/减少在所有依赖项完成之前尽快开始。
n = 5
pool = Pool(processes=n)
lineup(pool.imap_unordered(wait, range(n)))
# Produces something like the following
2 0.2837069069881948
4 0.44156753704276597
who: 2
who: 4
1 0.5563172244950703
0 0.6696008076879393
who: 1
who: 0
3 0.9911326214345308
who: 3
[2, 4, 1, 0, 3]
按照我习惯的方式使用dask.delayed,结果类似于map(),一旦所有依赖项都可用,收集就开始了。该顺序是静态的。
n = 5
order = delayed(lineup)([delayed(wait)(i) for i in range(n)])
order.compute()
# produces something like:
0 0.2792789023871932
2 0.44570072028850705
4 0.6969597596416385
1 0.766705306208266
3 0.9889956337687371
who: 0
who: 1
who: 2
who: 3
who: 4
[0, 1, 2, 3, 4]
DASK中是否有IMAP_UNORDERED等效项?也许是使用dask.Bag的东西?
解决方案
是。您可能正在寻找Dask Futures interface的as_completed函数。
在Handling Evolving Workflows
上有一个DASK示例为方便起见,我将在此处复制AS_Complete的文档字符串
已完成
按期货的完成顺序返回
这将返回一个迭代器,该迭代器按照输入的未来对象完成的顺序生成这些对象。无论顺序如何,在迭代器上调用Next将一直阻止,直到下一个将来完成。
此外,您还可以在使用.Add方法进行计算期间向此对象添加更多未来
参数
期货:期货集合 要按完成顺序迭代的Future对象的列表
WITH_RESULTS:Bool(False) 是否等待并同时包含期货的结果;在这种情况下,as_Complete将生成(Future,Result)的元组
RAISE_ERROR:布尔值(True) 当将来的结果引发异常时是否应引发异常;仅在WITH_RESULTS=True时影响行为。示例
>>> x, y, z = client.map(inc, [1, 2, 3]) # doctest: +SKIP
>>> for future in as_completed([x, y, z]): # doctest: +SKIP
... print(future.result()) # doctest: +SKIP
3
2
4
在计算过程中添加更多期货
>>> x, y, z = client.map(inc, [1, 2, 3]) # doctest: +SKIP
>>> ac = as_completed([x, y, z]) # doctest: +SKIP
>>> for future in ac: # doctest: +SKIP
... print(future.result()) # doctest: +SKIP
... if random.random() < 0.5: # doctest: +SKIP
... ac.add(c.submit(double, future)) # doctest: +SKIP
4
2
8
3
6
12
24
也可以选择等待,直到收集到结果
>>> ac = as_completed([x, y, z], with_results=True) # doctest: +SKIP
>>> for future, result in ac: # doctest: +SKIP
... print(result) # doctest: +SKIP
2
4
3
相关文章