从'Call_Soon_ThreadSafe'中收集返回值

问题描述

我正在尝试理解python asyncio的call_soon_threadsafe接口,但失败了,使用下面的示例代码,如果我的simple协程想要返回一些东西,我应该如何从调用方获取返回值?

import time
import asyncio as aio
import uvloop

from threading import Thread

aio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def simple(a, fut:aio.Future):
  await aio.sleep(a)
  return fut.set_result(a)

def delegator(loop):
  aio.set_event_loop(loop)
  loop.run_forever()

loop_exec = aio.new_event_loop()

t = Thread(target=delegator, args=(loop_exec,))
t.start()


if __name__ == '__main__':
  start_time = time.time()

  fut = loop_exec.create_future() # tried to get back returned value by future
  handle = loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, fut))
  res = aio.wait_for(fut, 10)

  print('Time consumed: {}s'.format(time.time() - start_time))
  print('>>>>>>>>>>', res)

# Output
Time consumed: 3.2901763916015625e-05s
>>>>>>>>>> <generator object wait_for at 0x110bb9b48>

如您所见,我试图通过将未来传递给在不同线程中运行的协程来取回返回值,但仍不知道如何正确获取它。

基本上有两个问题:

  1. 使用上面的示例代码,我如何从调用方取回返回值?
  2. 这个call_soon_threadsafe的实际用例是什么,只是觉得run_coroutine_threadsafe使用起来更方便,几乎可以涵盖我在这种不同线程协同程序交互中能想象到的所有情况。

解决方案

使用上面的示例代码,我如何从调用方取回返回值?

由于事件循环在主线程之外运行,您需要使用线程感知的同步设备。例如:

async def simple(a, event):
    await asyncio.sleep(a)
    event.simple_result = a
    event.set()

done = threading.Event()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
done.wait(10)
res = done.simple_result

或者,您可以使用concurrent.futures.Future进行同步,这类似于具有对象有效负载的一次性事件。(请注意,您不能使用异步将来,因为它是not thread-safe。)

async def simple(a, fut):
    await asyncio.sleep(a)
    fut.set_result(a)

done = concurrent.futures.Future()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
res = done.result(10)

正如文森特在评论中指出的,这是run_coroutine_threadsafe将为您做的事情:

async def simple(a):
    await asyncio.sleep(a)
    return a

fut = asyncio.run_coroutine_threadsafe(simple(3))
res = fut.result(10)

这个call_soon_threadsafe

的实际用例是什么

最简单的答案是call_soon_threadsafe是一个较低级别的API,当您只想告诉事件循环执行或开始执行某项操作时使用它。call_soon_threadsafe是用于实现run_coroutine_threadsafe等功能以及许多其他功能的构建块。您为什么要自己使用该管道功能...

有时您希望执行普通函数,而不是协程。有时,您的函数是"即火即忘",您并不关心它的返回值。(或者,该函数最终会通过某个旁路通知您它的完成情况。)在这些情况下,call_soon_threadsafe是作业的正确工具,因为它更轻量级,因为它不会尝试创建额外的concurrent.futures.Future并将其附加到执行的代码。示例:

  • loop.call_soon_threadsafe(loop.stop)通知事件循环停止运行
  • loop.call_soon_threadsafe(queue.put_nowait, some_item)向无界异步队列添加内容
  • loop.call_soon_threadsafe(asyncio.create_task, coroutinefn())向事件循环提交协程而不等待其完成
  • loop.call_soon_threadsafe(some_future.set_result, value)从其他线程设置异步将来的结果
  • this answer
  • 中的低级代码

相关文章