只要还有未完成的取消屏蔽任务但不再存在,我如何运行异步循环?
问题描述
我正在尝试将一些代码添加到我现有的异步循环中,以便在Ctrl-C上提供干净的关闭。下面是它正在做的事情的抽象。import asyncio, signal
async def task1():
print("Starting simulated task1")
await asyncio.sleep(5)
print("Finished simulated task1")
async def task2():
print("Starting simulated task2")
await asyncio.sleep(5)
print("Finished simulated task2")
async def tasks():
await task1()
await task2()
async def task_loop():
try:
while True:
await asyncio.shield(tasks())
await asyncio.sleep(60)
except asyncio.CancelledError:
print("Shutting down task loop")
raise
async def aiomain():
loop = asyncio.get_running_loop()
task = asyncio.Task(task_loop())
loop.add_signal_handler(signal.SIGINT, task.cancel)
await task
def main():
try:
asyncio.run(aiomain())
except asyncio.CancelledError:
pass
#def main():
# try:
# loop = asyncio.get_event_loop()
# loop.create_task(aiomain())
# loop.run_forever()
# except asyncio.CancelledError:
# pass
if __name__ == '__main__':
main()
在本例中,假设task1
和task2
序列在启动后需要完成,否则一些工件将处于不一致的状态。(因此调用tasks
时使用asyncio.shield
包装。)
使用上面的代码,如果我在脚本启动后不久中断脚本,而它只是打印Starting simulated task1
,那么循环就会停止,task2
永远不会启动。如果我尝试切换到被注释掉的main
版本,则该版本永远不会退出,即使循环已正确取消并且至少在几分钟内没有任何进一步的反应。它确实有一些进展,因为它至少完成了task1
和task2
的任何正在进行的序列。
集思广益的一些可能的解决方案,但我仍然觉得我肯定遗漏了一些更简单的东西:
- 创建一个围绕
asyncio.shield
的包装器,该包装器递增由asyncio.Condition
对象同步的变量,运行屏蔽函数,然后递减该变量。然后,在aiomain
中的CancelledError
处理程序中,等待变量达到零,然后再重新执行异常。(在一个实现中,我可能会建议使用__aexit__
实现CancelledError
上的等待零逻辑将这个类的所有部分合并到一个类中。) - 完全跳过使用
asyncio
的取消机制,转而使用asyncio.Event
或类似的机制以允许中断点或可中断睡眠。尽管这看起来似乎更具侵入性,需要我指定哪些点被认为是可中断的,而不是声明哪些序列需要禁止取消。
解决方案
这是一个非常好的问题。我在计算答案的过程中学到了一些东西,所以我希望您仍在关注此帖子。
首先要研究的是Shield()方法是如何工作的?在这一点上,至少可以说,医生们令人困惑。直到我阅读了test_tasks.py中的标准库测试代码,我才明白这一点。以下是我的理解:
考虑以下代码片段:
async def coro_a():
await asyncio.sheild(task_b())
...
task_a = asyncio.create_task(coro_a())
task_a.cancel()
当执行TASK_a.ancel()语句时,TASK_a确实被取消。AWAIT语句立即抛出CancelledError,而不等待TASK_b完成。但是TASK_b继续运行。外部任务(A)停止,但内部任务(B)未停止。
以下是说明这一点的程序的修改版本。主要的更改是在CancelledError异常处理程序中插入一个等待,以使您的程序多活几秒钟。我在Windows上运行,这就是为什么我也稍微更改了您的信号处理程序,但这是一个次要问题。我还在打印语句中添加了时间戳。import asyncio
import signal
import time
async def task1():
print("Starting simulated task1", time.time())
await asyncio.sleep(5)
print("Finished simulated task1", time.time())
async def task2():
print("Starting simulated task2", time.time())
await asyncio.sleep(5)
print("Finished simulated task2", time.time())
async def tasks():
await task1()
await task2()
async def task_loop():
try:
while True:
await asyncio.shield(tasks())
await asyncio.sleep(60)
except asyncio.CancelledError:
print("Shutting down task loop", time.time())
raise
async def aiomain():
task = asyncio.create_task(task_loop())
KillNicely(task)
try:
await task
except asyncio.CancelledError:
print("Caught CancelledError", time.time())
await asyncio.sleep(5.0)
raise
class KillNicely:
def __init__(self, cancel_me):
self.cancel_me = cancel_me
self.old_sigint = signal.signal(signal.SIGINT,
self.trap_control_c)
def trap_control_c(self, signum, stack):
if signum != signal.SIGINT:
self.old_sigint(signum, stack)
else:
print("Got Control-C", time.time())
print(self.cancel_me.cancel())
def main():
try:
asyncio.run(aiomain())
except asyncio.CancelledError:
print("Program exit, cancelled", time.time())
# Output when ctrlC is struck during task1
#
# Starting simulated task1 1590871747.8977509
# Got Control-C 1590871750.8385916
# True
# Shutting down task loop 1590871750.8425908
# Caught CancelledError 1590871750.8435903
# Finished simulated task1 1590871752.908434
# Starting simulated task2 1590871752.908434
# Program exit, cancelled 1590871755.8488846
if __name__ == '__main__':
main()
您可以看到您的程序没有工作,因为它在TASK1和Task2有机会完成之前一取消TASK_LOOP就退出了。它们一直都在那里(或者更确切地说,如果程序继续运行,它们就会在那里)。
这说明了Shield()和Cancel()是如何交互的,但它实际上并没有解决您所说的问题。为此,我认为,您需要有一个可等待的对象,以便在重要任务完成之前保持程序的生命力。这个对象需要在顶层创建,并沿着堆栈向下传递到执行重要任务的位置。这是一个与您的程序类似的程序,但以您想要的方式执行。
我执行了三次运行:(1)在任务1期间按Control-C,(2)在任务2期间按Control-C,(3)在两个任务都完成后按Control-C。在前两种情况下,程序一直持续到任务2完成。在第三种情况下,它立即结束。import asyncio
import signal
import time
async def task1():
print("Starting simulated task1", time.time())
await asyncio.sleep(5)
print("Finished simulated task1", time.time())
async def task2():
print("Starting simulated task2", time.time())
await asyncio.sleep(5)
print("Finished simulated task2", time.time())
async def tasks(kwrap):
fut = asyncio.get_running_loop().create_future()
kwrap.awaitable = fut
await task1()
await task2()
fut.set_result(1)
async def task_loop(kwrap):
try:
while True:
await asyncio.shield(tasks(kwrap))
await asyncio.sleep(60)
except asyncio.CancelledError:
print("Shutting down task loop", time.time())
raise
async def aiomain():
kwrap = KillWrapper()
task = asyncio.create_task(task_loop(kwrap))
KillNicely(task)
try:
await task
except asyncio.CancelledError:
print("Caught CancelledError", time.time())
await kwrap.awaitable
raise
class KillNicely:
def __init__(self, cancel_me):
self.cancel_me = cancel_me
self.old_sigint = signal.signal(signal.SIGINT,
self.trap_control_c)
def trap_control_c(self, signum, stack):
if signum != signal.SIGINT:
self.old_sigint(signum, stack)
else:
print("Got Control-C", time.time())
print(self.cancel_me.cancel())
class KillWrapper:
def __init__(self):
self.awaitable = asyncio.get_running_loop().create_future()
self.awaitable.set_result(0)
def main():
try:
asyncio.run(aiomain())
except asyncio.CancelledError:
print("Program exit, cancelled", time.time())
# Run 1 Control-C during task1
# Starting simulated task1 1590872408.6737766
# Got Control-C 1590872410.7344952
# True
# Shutting down task loop 1590872410.7354996
# Caught CancelledError 1590872410.7354996
# Finished simulated task1 1590872413.6747622
# Starting simulated task2 1590872413.6747622
# Finished simulated task2 1590872418.6750958
# Program exit, cancelled 1590872418.6750958
#
# Run 1 Control-C during task2
# Starting simulated task1 1590872492.927735
# Finished simulated task1 1590872497.9280624
# Starting simulated task2 1590872497.9280624
# Got Control-C 1590872499.5973852
# True
# Shutting down task loop 1590872499.5983844
# Caught CancelledError 1590872499.5983844
# Finished simulated task2 1590872502.9274273
# Program exit, cancelled 1590872502.9287038
#
# Run 1 Control-C after task2 -> immediate exit
# Starting simulated task1 1590873694.2925708
# Finished simulated task1 1590873699.2928336
# Starting simulated task2 1590873699.2928336
# Finished simulated task2 1590873704.2938952
# Got Control-C 1590873706.0790765
# True
# Shutting down task loop 1590873706.0804725
# Caught CancelledError 1590873706.0804725
# Program exit, cancelled 1590873706.0814824
相关文章