Asyncio.run_coroutine_ThreadSafe的未来是否永远挂起?
问题描述
作为我的previous question about calling an async function from a synchronous one的后续工作,我发现asyncio.run_coroutine_threadsafe
。
从纸面上看,这看起来很理想。根据this StackOverflow question中的评论,这看起来很理想。我可以创建一个新的线程,获取对原始事件循环的引用,并安排异步函数在原始事件循环内运行,同时只阻塞新的线程。
class _AsyncBridge:
def call_async_method(self, function, *args, **kwargs):
print(f"call_async_method {threading.get_ident()}")
event_loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor()
return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()
async def _async_wrapper(self, event_loop, function, *args, **kwargs):
print(f"async_wrapper {threading.get_ident()}")
future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
return future.result()
这不会出错,但也不会返回。期货只是挂起,而异步呼叫永远不会被击中。在call_async_method
、_async_wrapper
或两者中使用Future似乎都无关紧要;无论我在哪里使用Future,它都会挂起。
我尝试将run_coroutine_threadsafe
调用直接放入我的主事件循环:
event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()
这里也是如此,未来悬而未决。
我尝试使用LoopExecutor
定义的here类,它似乎是满足我需要的确切答案。
event_loop = asyncio.get_event_loop()
loop_executor = LoopExecutor(event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
那里也挂着《回归的未来》。
我认为我阻塞了原始的事件循环,因此计划的任务永远不会运行,因此我创建了一个新的事件循环:
event_loop = asyncio.get_event_loop()
new_event_loop = asyncio.new_event_loop()
print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
loop_executor = LoopExecutor(new_event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
return return_value
仍然挂在future.result()
,我不知道为什么。
asyncio.run_coroutine_threadsafe
/我使用它的方式有什么问题?
解决方案
我认为有两个问题。第一个问题是run_coroutine_threadsafe
只提交协程,而不真正运行它。
所以
event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()
不起作用,因为您从未运行过此循环。
要让它工作,理论上可以使用asyncio.run(future)
,但实际上您不能使用,可能是因为它是由run_coroutine_threadsafe
提交的。以下方法将起作用:
import asyncio
async def stop():
await asyncio.sleep(3)
event_loop = asyncio.get_event_loop()
coro = asyncio.sleep(1, result=3)
future = asyncio.run_coroutine_threadsafe(coro, event_loop)
event_loop.run_until_complete(stop())
print(future.result())
第二个问题是,我想您已经注意到您的结构在某种程度上颠倒了过来。您应该在单独的线程中运行事件循环,但从主线程提交任务。如果您在单独的线程中提交它,您仍然需要在主线程中运行事件循环才能实际执行它。大多数情况下,我建议只在单独的线程中创建另一个事件循环。
相关文章