Python`的`concurent.futures`:根据完成的顺序迭代期货

2022-04-17 00:00:00 python multithreading future

问题描述

我想要类似于executor.map的内容,除了当我迭代结果时,我希望根据完成的顺序迭代它们,例如,最先完成的工作项应该首先出现在迭代中,等等。这是因为迭代将阻止当序列中的每个工作项尚未完成时。

我知道如何使用队列自己实现这一点,但我想知道是否可以使用futures框架。

(我主要使用基于线程的执行器,所以我想要一个适用于这些执行器的答案,但也欢迎通用的答案。)

更新:感谢您的回答!您能解释一下如何将as_completedexecutor.map一起使用吗?executor.map是我在使用Futures时最有用、最简洁的工具,我不愿意开始手动使用Future对象。


解决方案

executor.map()与内置map()一样,只返回可迭代顺序的结果,因此不幸的是,您不能使用它来确定完成的顺序。concurrent.futures.as_completed()是您要查找的内容-这里有一个示例:

import time
import concurrent.futures

times = [3, 1, 2]

def sleeper(secs):
    time.sleep(secs)
    print('I slept for {} seconds'.format(secs))
    return secs

# returns in the order given
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    print(list(executor.map(sleeper, times)))

# I slept for 1 seconds
# I slept for 2 seconds
# I slept for 3 seconds
# [3, 1, 2]

# returns in the order completed
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futs = [executor.submit(sleeper, secs) for secs in times]
    print([fut.result() for fut in concurrent.futures.as_completed(futs)])

# I slept for 1 seconds
# I slept for 2 seconds
# I slept for 3 seconds
# [1, 2, 3]

当然,如果需要使用map接口,您可以创建自己的map_as_completed()函数来封装上面的内容(可能会将其添加到子类Executor()中),但我认为通过executor.submit()创建未来实例是一种更简单、更干净的方法(也允许您提供无参数、无参数)。

相关文章