事件循环中的共享队列

2022-03-25 00:00:00 python python-asyncio

问题描述

是否可以在一个事件循环中跨不同任务共享Asyncio.Queue?

用例:

两个任务在队列上发布数据,一个任务从队列中获取新项目。以异步方式执行所有任务。

main.py

import asyncio
import creator


async def pull_message(queue):
    while True:
        # Here I dont get messages, maybe the queue is always
        # occupied by a other task? 
        msg = await queue.get()
        print(msg)

if __name__ == "__main__"
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(loop=loop)
    future = asyncio.ensure_future(pull_message(queue))

    creators = list()
    for i in range(2):
        creators.append(loop.create_task(cr.populate_msg(queue)))

    # add future to creators for easy handling
    creators.append(future)
    loop.run_until_complete(asyncio.gather(*creators))

creator.py

import asyncio

async def populate_msg(queue):
    while True:
        msg = "Foo"
        await queue.put(msg)

解决方案

您代码中的问题是populate_msg不会屈服于事件循环,因为队列是无界的。这有点违反直觉,因为协同例程明显包含await,但是await只有在协同例程否则会挡路的情况下才会暂停协同例程的执行。由于无界队列上的put()从不阻塞,因此populate_msg是事件循环唯一执行的操作。

一旦您将populate_msg更改为实际执行其他操作(如等待网络事件),问题就会消失。出于测试目的,您可以在循环内添加await asyncio.sleep(0),这将强制协程在while循环的每次迭代时将控制权让给事件循环。请注意,这将导致事件循环通过连续旋转循环来消耗整个核心。

相关文章