Asyncio.run_coroutine_ThreadSafe的未来是否永远挂起?

2022-03-25 00:00:00 python python-3.x python-asyncio

问题描述

作为我的previous question about calling an async function from a synchronous one的后续工作,我发现asyncio.run_coroutine_threadsafe

从纸面上看,这看起来很理想。根据this StackOverflow question中的评论,这看起来很理想。我可以创建一个新的线程,获取对原始事件循环的引用,并安排异步函数在原始事件循环内运行,同时只阻塞新的线程。

class _AsyncBridge:
    def call_async_method(self, function, *args, **kwargs):
        print(f"call_async_method {threading.get_ident()}")
        event_loop = asyncio.get_event_loop()
        thread_pool = ThreadPoolExecutor()
        return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()

    async def _async_wrapper(self, event_loop, function, *args, **kwargs):
        print(f"async_wrapper {threading.get_ident()}")
        future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
        return future.result()

这不会出错,但也不会返回。期货只是挂起,而异步呼叫永远不会被击中。在call_async_method_async_wrapper或两者中使用Future似乎都无关紧要;无论我在哪里使用Future,它都会挂起。

我尝试将run_coroutine_threadsafe调用直接放入我的主事件循环:

event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()

这里也是如此,未来悬而未决。

我尝试使用LoopExecutor定义的here类,它似乎是满足我需要的确切答案。

event_loop = asyncio.get_event_loop()
loop_executor = LoopExecutor(event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()

那里也挂着《回归的未来》。

我认为我阻塞了原始的事件循环,因此计划的任务永远不会运行,因此我创建了一个新的事件循环:

event_loop = asyncio.get_event_loop()
new_event_loop = asyncio.new_event_loop()
print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
loop_executor = LoopExecutor(new_event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
return return_value

仍然挂在future.result(),我不知道为什么。

asyncio.run_coroutine_threadsafe/我使用它的方式有什么问题?


解决方案

我认为有两个问题。第一个问题是run_coroutine_threadsafe只提交协程,而不真正运行它。

所以

event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()

不起作用,因为您从未运行过此循环。

要让它工作,理论上可以使用asyncio.run(future),但实际上您不能使用,可能是因为它是由run_coroutine_threadsafe提交的。以下方法将起作用:

import asyncio

async def stop():
    await asyncio.sleep(3)

event_loop = asyncio.get_event_loop()
coro = asyncio.sleep(1, result=3)
future = asyncio.run_coroutine_threadsafe(coro, event_loop)
event_loop.run_until_complete(stop())
print(future.result())

第二个问题是,我想您已经注意到您的结构在某种程度上颠倒了过来。您应该在单独的线程中运行事件循环,但从主线程提交任务。如果您在单独的线程中提交它,您仍然需要在主线程中运行事件循环才能实际执行它。大多数情况下,我建议只在单独的线程中创建另一个事件循环。

相关文章