根据参数使函数异步化
问题描述
我有一个函数,它发出一个HTTP请求,然后返回响应。我希望这个函数能够根据参数在阻塞或非阻塞模式下运行。这在Python语言中是可能的吗?我想象的伪代码应该是这样的:
def maybe_async(asynchronous):
if asynchronous:
# We assume there's an event loop running and we can await
return await send_async_http_request()
else:
# Perform a normal synchronous call
return send_http_request()
这引发了SyntaxError
,我希望找到一种方法来重新表述它,以便如果asynchronous=True
但没有运行任何事件循环,则它在运行时引发RuntimeError: no running event loop
。
有两个评论说我只需从
maybe_async
中删除await
关键字,但是我相信如果我们想要对响应进行后处理,那么这是不相关的。下面是一个更具体的用例:假设我想要向最终用户提供一个函数,该函数从GitHub API收集所有事件ID,并根据用户输入以阻塞或非阻塞模式执行此操作。以下是我想做的事情:
import aiohttp
import requests
async def get_async(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
def get_sync(url):
return requests.get(url).json()
def get(url, asynchronous):
if asynchronous:
return get_async(url) # This is the suggested edit where
# I removed the await keyword
else:
return get_sync(url)
def get_github_events_ids(asynchronous=False):
url = 'https://api.github.com/events'
body = get(url, asynchronous)
return [event['id'] for event in body]
但明显运行get_github_events_ids(True)
会引发TypeError: 'coroutine' object is not iterable
。
我的问题是:除了复制所有函数之外,是否还有其他代码设计允许在同步和异步之间进行选择?
解决方案
问题是,在Asyncio下,像get_github_events_ids
这样的函数本身必须是异步的,或者返回通过调用(但不等待)异步函数获得的对象。这是为了允许他们在等待结果到达时挂起执行。
首先,在内部,代码必须始终使用异步,因为这是在异步情况下传播挂起的唯一方法。但是在同步的情况下,它可以只将同步调用返回的对象包装在可以等待的东西中(因为协例程只返回同步调用的结果),我们可以在顶层等待。我们称这个操作为"等待"。在顶层,在异步情况下,我们可以将协程对象返回给等待它的调用者,而在同步情况下,我们只需驱动协程完成,我们可以恰当地将该操作称为";drive";。
以下是get_github_events_ids
将显示的内容:
def get_github_events_ids(asynchronous=False):
coro = _get_github_events_ids_impl(asynchronous)
if asynchronous:
return coro # let the caller await
else:
return drive(coro) # get sync result from "coroutine"
该实现看起来始终是异步的:
async def _get_github_events_ids_impl(asynchronous):
url = 'https://api.github.com/events'
body = await awaitify(get(url, asynchronous))
return [event['id'] for event in body]
# "get", "get_sync", and "get_async" remain exactly as
# in the question
现在我们只需要定义awaitify
和drive
幻函数:
def awaitify(obj):
if isinstance(obj, types.CoroutineType):
return obj # nothing to do, we're already async
# return an async shim that will just return the object
async def _identity():
return obj
return _identity()
def drive(coro):
# coro is not really async, so we don't need an event loop or
# anything, we just drive the coroutine object to completion.
# Don't try this at home!
while True:
try:
coro.send(None)
except StopIteration as done:
return done.value
要测试它,只需双向运行:
if __name__ == '__main__':
# test sync
print(get_github_events_ids(False))
# test async
print(asyncio.run(get_github_events_ids(True)))
相关文章