事件循环中的共享队列
问题描述
是否可以在一个事件循环中跨不同任务共享Asyncio.Queue?
用例:
两个任务在队列上发布数据,一个任务从队列中获取新项目。以异步方式执行所有任务。
main.py
import asyncio
import creator
async def pull_message(queue):
while True:
# Here I dont get messages, maybe the queue is always
# occupied by a other task?
msg = await queue.get()
print(msg)
if __name__ == "__main__"
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
future = asyncio.ensure_future(pull_message(queue))
creators = list()
for i in range(2):
creators.append(loop.create_task(cr.populate_msg(queue)))
# add future to creators for easy handling
creators.append(future)
loop.run_until_complete(asyncio.gather(*creators))
creator.py
import asyncio
async def populate_msg(queue):
while True:
msg = "Foo"
await queue.put(msg)
解决方案
您代码中的问题是populate_msg
不会屈服于事件循环,因为队列是无界的。这有点违反直觉,因为协同例程明显包含await
,但是await
只有在协同例程否则会挡路的情况下才会暂停协同例程的执行。由于无界队列上的put()
从不阻塞,因此populate_msg
是事件循环唯一执行的操作。
populate_msg
更改为实际执行其他操作(如等待网络事件),问题就会消失。出于测试目的,您可以在循环内添加await asyncio.sleep(0)
,这将强制协程在while
循环的每次迭代时将控制权让给事件循环。请注意,这将导致事件循环通过连续旋转循环来消耗整个核心。
相关文章