正确使用Asyncio.Condition的WAIT_FOR()方法
问题描述
我正在使用Python的asyncio
模块编写一个项目,我想使用它的同步原语来同步我的任务。但是,它的行为似乎并不像我预期的那样。
从文档中看,Condition.wait_for()
似乎提供了一种允许协程等待特定的用户定义条件求值为TRUE的方法。然而,在尝试使用该方法时,它的行为似乎是我意想不到的-我的条件只检查一次,如果发现它是假的,等待的任务就会永远挂起,再也不会检查一次。我在下面写了一个简短的示例来演示我正在尝试做的事情:
#!/usr/bin/env python
import asyncio
thing = False
setter_done = None
getter_done = None
async def main():
setter_done = asyncio.Event()
getter_done = asyncio.Event()
setter = asyncio.ensure_future(set_thing())
getter = asyncio.ensure_future(get_thing())
#To avoid the loop exiting prematurely:
await setter_done.wait()
await getter_done.wait()
async def set_thing():
global thing
global setter_done
thing = False
#sleep for some arbitrary amount of time; simulate work happening
await asyncio.sleep(10)
thing = True
print("Thing was set to True!")
setter_done.set()
async def get_thing():
global thing
global getter_done
def check_thing():
print("Checking...")
return thing
c = asyncio.Condition()
await c.acquire()
await c.wait_for(check_thing)
c.release()
print("Thing was found to be true!")
getter_done.set()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
我预计这将打印类似以下内容的内容:
Checking...
Thing was set to True!
Checking...
Thing was found to be True!
相反,我得到:
Checking...
Thing was set to True!
... (hangs indefinitely)
解决方案
我发布了一个完整的答案,其中包含许多评论,以帮助这些人解决类似的问题。我已经将代码示例更改为使用类而不是全局变量。有点长,但我希望不会太复杂。
基本上Command
类表示一个任务。它是异步的,所以它可以做很多事情。在我的例子中,我只创建了两个伪命令(read";Two Tasks";),一个暂停5秒,另一个暂停8秒,然后等待它们都以一个条件结束。显然,条件不是我所做的事情的唯一方法,但为了与最初的答案保持一致,我认为提供一个完全有效的示例是很有趣的。那就开始吧!
import asyncio
from typing import Set
class Command:
"""A command, an asynchronous task, imagine an asynchronous action."""
async def run(self):
"""To be defined in sub-classes."""
pass
async def start(self, condition: asyncio.Condition,
commands: Set['Command']):
"""
Start the task, calling run asynchronously.
This method also keeps track of the running commands.
"""
commands.add(self)
await self.run()
commands.remove(self)
# At this point, we should ask the condition to update
# as the number of running commands might have reached 0.
async with condition:
condition.notify()
class Look(Command):
"""A subclass of a command, running a dummy task."""
async def run(self):
print("Before looking...")
await asyncio.sleep(5)
print("After looking")
class Scan(Command):
"""A subclass of a command, running a dummy task."""
async def run(self):
print("Before scanning...")
await asyncio.sleep(8)
print("After scanning")
async def main():
"""Our main coroutine, starting commands."""
condition = asyncio.Condition()
commands = set()
commands.add(Look())
commands.add(Scan())
asyncio.gather(*(cmd.start(condition, commands) for cmd in commands))
# Wait for the number of commands to reach 0
async with condition:
await condition.wait_for(lambda: len(commands) == 0)
print("There's no running command now, exiting.")
asyncio.run(main())
因此,在实践中(通常从末尾开始),我们将main
称为协程。在main
中,我们创建两个命令Look
和Scan
,并调用它们的start
方法。start
方法是在每个命令上定义的,它基本上负责在命令运行之前将命令本身写入到一个集合中,并在运行后(即完全完成后)删除它。然后它应该通知条件再次检查命令的长度。当没有剩余命令时,程序结束。如果您运行此脚本(我使用Python3.8运行它),您应该会看到类似以下内容:
Before scanning...
Before looking...
After looking
After scanning
There's no running command now, exiting.
请注意,这两个命令同时启动(实际上,Look
在稍早开始,但仍然是Scan
在Look
完成之前开始)。但是Look
确实比Scan
结束(大约3秒)。在两个命令都完成之前,不会检查我们的状况。
是否可以改用事件、锁或信号量?有可能,但我喜欢在那个例子中使用条件。您可以轻松地拥有更多任务,而无需进行大量修改。
相关文章