如何在事件循环之外运行协同例程?

2022-03-25 00:00:00 python python-asyncio async-await

问题描述

通常,您可以通过执行以下操作来获取协同例程的结果:

async def coro():
    await asycnio.sleep(3)
    return 'a value'

loop = asyncio.get_event_loop()
value = loop.run_until_complete(coro())

出于好奇,不使用事件循环获取该值的最简单方法是什么?

[编辑]

我认为更简单的方法可以是:

async def coro():
    ...

value = asyncio.run(coro())  # Python 3.7+
但是,是否可以像JS中那样全局排序yield from(或await)acoro()?如果没有,为什么?


解决方案

这里有两个问题:一个是关于"在顶层"等待协程,或者更具体地说是关于在开发环境中等待协程。另一个是关于在没有事件循环的情况下运行协同例程。

关于第一个问题,这在Python中当然是可能的,就像在Chrome Canary Dev工具中一样-通过工具自身与事件循环的集成来处理它。事实上,IPython7.0及更高版本支持异步natively,您可以按照预期在顶层使用await coro()

关于第二个问题,很容易在没有事件循环的情况下驱动单个协程,但用处不是很大。让我们来研究一下原因。

调用协程函数时,它返回协程对象。此对象通过调用其send()方法启动和恢复。当协程决定挂起(因为它await是被阻塞的东西)时,send()将返回。当协程决定返回(因为它已经到达末尾或者因为它遇到了显式的return)时,它将引发一个StopIteration异常,并将value属性设置为返回值。考虑到这一点,单个协程的最小驱动程序可能如下所示:

def drive(c):
    while True:
        try:
            c.send(None)
        except StopIteration as e:
            return e.value

这对于简单的协同例程非常有效:

>>> async def pi():
...     return 3.14
... 
>>> drive(pi())
3.14

甚至更复杂的问题:

>>> async def plus(a, b):
...     return a + b
... 
>>> async def pi():
...     val = await plus(3, 0.14)
...     return val
... 
>>> drive(pi())
3.14

但是仍然缺少一些东西-上述协程都没有挂起它们的执行。当一个协同例程挂起时,它允许其他协同例程运行,这使得事件循环能够(似乎)同时执行多个协同例程。例如,asyncio有一个sleep()协程,该协程在等待时会在指定时间段内暂停执行:

async def wait(s):
    await asyncio.sleep(1)
    return s

>>> asyncio.run(wait("hello world"))
'hello world'      # printed after a 1-second pause

但是,drive无法执行此协同程序完成:

>>> drive(wait("hello world"))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in drive
  File "<stdin>", line 2, in wait
  File "/usr/lib/python3.7/asyncio/tasks.py", line 564, in sleep
    return await future
RuntimeError: await wasn't used with future

发生的情况是sleep()通过生成一个特殊的"未来"对象与事件循环通信。只有在设置了未来之后,才能恢复等待未来的协程。"真正的"事件循环将通过运行其他协同例程来做到这一点,直到将来完成为止。

要解决这个问题,我们可以编写自己的sleep实现来处理我们的迷你事件循环。为此,我们需要使用迭代器来实现可等待的:

class my_sleep:
    def __init__(self, d):
        self.d = d
    def __await__(self):
        yield 'sleep', self.d

我们产生一个协程调用者看不到的元组,但会告诉drive(我们的事件循环)要做什么。drivewait现在如下所示:

def drive(c):
    while True:
        try:
            susp_val = c.send(None)
            if susp_val is not None and susp_val[0] == 'sleep':
                time.sleep(susp_val[1])
        except StopIteration as e:
            return e.value

async def wait(s):
    await my_sleep(1)
    return s

使用此版本,wait可以正常工作:

>>> drive(wait("hello world"))
'hello world'
这仍然不是很有用,因为驱动协同例程的唯一方法是调用drive(),它同样支持单个协同例程。因此,我们可能已经编写了一个同步函数,它只调用time.sleep(),然后一天就调用它。要使我们的协同程序支持异步编程的用例,drive()需要:

  • 支持多个协程运行和挂起
  • 在驱动器循环中实现新协程的派生
  • 允许协同例程在IO相关事件(如文件描述符变为可读或可写)上注册唤醒,同时始终支持多个此类事件而不会降低性能

这就是异步事件循环为表带来的功能,以及许多其他特性。David Beazley在this talk中出色地演示了从头开始构建事件循环,他在现场观众面前实现了一个功能性事件循环。

相关文章