等待任何未来的异步

问题描述

我正在尝试使用Asyncio来处理并发网络I/O。要在单个点调度非常多的功能,每个功能完成的时间差别很大。然后,将在每个输出的单独进程中处理接收到的数据。

处理数据的顺序无关紧要,因此考虑到输出可能会有很长的等待期,我希望await无论将来先完成什么,而不是按预定义的顺序。

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    for f in futures:
       await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

正常情况下,按期货排队顺序等待即可:

蓝色表示每个任务在Executor队列中的时间,即run_in_executor已调用,但函数尚未执行,因为Executor同时只运行5个任务;绿色表示函数本身执行所花费的时间;红色表示等待所有之前的未来await所花费的时间。

在我的情况下,函数在时间上变化很大,等待队列中的前一个期货等待会浪费很多时间,而我可以在本地处理GET OUTPUT。这会使我的系统空闲一段时间,直到几个输出同时完成时才会不堪重负,然后跳回空闲状态,等待更多请求完成。

是否有办法await无论将来在执行器中首先完成吗?


解决方案

看起来您正在查找asyncio.wait和return_when=asyncio.FIRST_COMPLETED

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    while futures:
        done, futures = await asyncio.wait(futures, 
            loop=loop, return_when=asyncio.FIRST_COMPLETED)  
        for f in done:
            await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

相关文章