正确使用Asyncio.Condition的WAIT_FOR()方法

2022-03-25 00:00:00 python python-3.x python-asyncio

问题描述

我正在使用Python的asyncio模块编写一个项目,我想使用它的同步原语来同步我的任务。但是,它的行为似乎并不像我预期的那样。

从文档中看,Condition.wait_for()似乎提供了一种允许协程等待特定的用户定义条件求值为TRUE的方法。然而,在尝试使用该方法时,它的行为似乎是我意想不到的-我的条件只检查一次,如果发现它是假的,等待的任务就会永远挂起,再也不会检查一次。我在下面写了一个简短的示例来演示我正在尝试做的事情:

#!/usr/bin/env python

import asyncio

thing = False

setter_done = None
getter_done = None

async def main():

    setter_done = asyncio.Event()
    getter_done = asyncio.Event()

    setter = asyncio.ensure_future(set_thing())
    getter = asyncio.ensure_future(get_thing())

    #To avoid the loop exiting prematurely:
    await setter_done.wait()
    await getter_done.wait()

async def set_thing():

    global thing
    global setter_done

    thing = False
    #sleep for some arbitrary amount of time; simulate work happening
    await asyncio.sleep(10)
    thing = True

    print("Thing was set to True!")
    setter_done.set()

async def get_thing():

    global thing
    global getter_done

    def check_thing():
        print("Checking...")
        return thing

    c = asyncio.Condition()
    await c.acquire()
    await c.wait_for(check_thing)
    c.release()
    print("Thing was found to be true!")
    getter_done.set()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

我预计这将打印类似以下内容的内容:

Checking...
Thing was set to True!
Checking...
Thing was found to be True!

相反,我得到:

Checking...
Thing was set to True!
... (hangs indefinitely)

解决方案

我发布了一个完整的答案,其中包含许多评论,以帮助这些人解决类似的问题。我已经将代码示例更改为使用类而不是全局变量。有点长,但我希望不会太复杂。

基本上Command类表示一个任务。它是异步的,所以它可以做很多事情。在我的例子中,我只创建了两个伪命令(read";Two Tasks";),一个暂停5秒,另一个暂停8秒,然后等待它们都以一个条件结束。显然,条件不是我所做的事情的唯一方法,但为了与最初的答案保持一致,我认为提供一个完全有效的示例是很有趣的。那就开始吧!

import asyncio
from typing import Set

class Command:

    """A command, an asynchronous task, imagine an asynchronous action."""

    async def run(self):
        """To be defined in sub-classes."""
        pass

    async def start(self, condition: asyncio.Condition,
            commands: Set['Command']):
        """
        Start the task, calling run asynchronously.

        This method also keeps track of the running commands.

        """
        commands.add(self)
        await self.run()
        commands.remove(self)

        # At this point, we should ask the condition to update
        # as the number of running commands might have reached 0.
        async with condition:
            condition.notify()

class Look(Command):

    """A subclass of a command, running a dummy task."""

    async def run(self):
        print("Before looking...")
        await asyncio.sleep(5)
        print("After looking")

class Scan(Command):

    """A subclass of a command, running a dummy task."""

    async def run(self):
        print("Before scanning...")
        await asyncio.sleep(8)
        print("After scanning")

async def main():
    """Our main coroutine, starting commands."""
    condition = asyncio.Condition()
    commands = set()
    commands.add(Look())
    commands.add(Scan())
    asyncio.gather(*(cmd.start(condition, commands) for cmd in commands))

    # Wait for the number of commands to reach 0
    async with condition:
        await condition.wait_for(lambda: len(commands) == 0)
        print("There's no running command now, exiting.")

asyncio.run(main())
因此,在实践中(通常从末尾开始),我们将main称为协程。在main中,我们创建两个命令LookScan,并调用它们的start方法。start方法是在每个命令上定义的,它基本上负责在命令运行之前将命令本身写入到一个集合中,并在运行后(即完全完成后)删除它。然后它应该通知条件再次检查命令的长度。当没有剩余命令时,程序结束。如果您运行此脚本(我使用Python3.8运行它),您应该会看到类似以下内容:

Before scanning...
Before looking...
After looking
After scanning
There's no running command now, exiting.
请注意,这两个命令同时启动(实际上,Look在稍早开始,但仍然是ScanLook完成之前开始)。但是Look确实比Scan结束(大约3秒)。在两个命令都完成之前,不会检查我们的状况。

是否可以改用事件、锁或信号量?有可能,但我喜欢在那个例子中使用条件。您可以轻松地拥有更多任务,而无需进行大量修改。

相关文章