根据参数使函数异步化

2022-03-25 00:00:00 python python-asyncio

问题描述

我有一个函数,它发出一个HTTP请求,然后返回响应。我希望这个函数能够根据参数在阻塞或非阻塞模式下运行。这在Python语言中是可能的吗?我想象的伪代码应该是这样的:

def maybe_async(asynchronous):
    if asynchronous:
        # We assume there's an event loop running and we can await
        return await send_async_http_request()
    else:
        # Perform a normal synchronous call
        return send_http_request()

这引发了SyntaxError,我希望找到一种方法来重新表述它,以便如果asynchronous=True但没有运行任何事件循环,则它在运行时引发RuntimeError: no running event loop


有两个评论说我只需从maybe_async中删除await关键字,但是我相信如果我们想要对响应进行后处理,那么这是不相关的。下面是一个更具体的用例:假设我想要向最终用户提供一个函数,该函数从GitHub API收集所有事件ID,并根据用户输入以阻塞或非阻塞模式执行此操作。以下是我想做的事情:

import aiohttp
import requests


async def get_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

def get_sync(url):
    return requests.get(url).json()

def get(url, asynchronous):
    if asynchronous:
        return get_async(url)  # This is the suggested edit where
                               # I removed the await keyword
    else:
        return get_sync(url)

def get_github_events_ids(asynchronous=False):
    url = 'https://api.github.com/events'
    body = get(url, asynchronous)
    return [event['id'] for event in body]

但明显运行get_github_events_ids(True)会引发TypeError: 'coroutine' object is not iterable

我的问题是:除了复制所有函数之外,是否还有其他代码设计允许在同步和异步之间进行选择?


解决方案

问题是,在Asyncio下,像get_github_events_ids这样的函数本身必须是异步的,或者返回通过调用(但不等待)异步函数获得的对象。这是为了允许他们在等待结果到达时挂起执行。

您可以为每个函数创建两个版本,一个用于等待,另一个用于运行代码,但这将导致大量代码重复。有一个更好的方法,但它需要一点魔法。

首先,在内部,代码必须始终使用异步,因为这是在异步情况下传播挂起的唯一方法。但是在同步的情况下,它可以只将同步调用返回的对象包装在可以等待的东西中(因为协例程只返回同步调用的结果),我们可以在顶层等待。我们称这个操作为"等待"。在顶层,在异步情况下,我们可以将协程对象返回给等待它的调用者,而在同步情况下,我们只需驱动协程完成,我们可以恰当地将该操作称为";drive";。

以下是get_github_events_ids将显示的内容:

def get_github_events_ids(asynchronous=False):
    coro = _get_github_events_ids_impl(asynchronous)
    if asynchronous:
        return coro         # let the caller await
    else:
        return drive(coro)  # get sync result from "coroutine"

该实现看起来始终是异步的:

async def _get_github_events_ids_impl(asynchronous):
    url = 'https://api.github.com/events'
    body = await awaitify(get(url, asynchronous))
    return [event['id'] for event in body]

# "get", "get_sync", and "get_async" remain exactly as
# in the question

现在我们只需要定义awaitifydrive幻函数:

def awaitify(obj):
    if isinstance(obj, types.CoroutineType):
        return obj  # nothing to do, we're already async
    # return an async shim that will just return the object
    async def _identity():
        return obj
    return _identity()

def drive(coro):
    # coro is not really async, so we don't need an event loop or
    # anything, we just drive the coroutine object to completion.
    # Don't try this at home!
    while True:
        try:
            coro.send(None)
        except StopIteration as done:
            return done.value

要测试它,只需双向运行:

if __name__ == '__main__':
    # test sync
    print(get_github_events_ids(False))
    # test async
    print(asyncio.run(get_github_events_ids(True)))

相关文章