只要还有未完成的取消屏蔽任务但不再存在,我如何运行异步循环?

2022-03-25 00:00:00 python python-asyncio

问题描述

我正在尝试将一些代码添加到我现有的异步循环中,以便在Ctrl-C上提供干净的关闭。下面是它正在做的事情的抽象。

import asyncio, signal

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, task.cancel)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

#def main():
#    try:
#        loop = asyncio.get_event_loop()
#        loop.create_task(aiomain())
#        loop.run_forever()
#    except asyncio.CancelledError:
#        pass

if __name__ == '__main__':
    main()

在本例中,假设task1task2序列在启动后需要完成,否则一些工件将处于不一致的状态。(因此调用tasks时使用asyncio.shield包装。)

使用上面的代码,如果我在脚本启动后不久中断脚本,而它只是打印Starting simulated task1,那么循环就会停止,task2永远不会启动。如果我尝试切换到被注释掉的main版本,则该版本永远不会退出,即使循环已正确取消并且至少在几分钟内没有任何进一步的反应。它确实有一些进展,因为它至少完成了task1task2的任何正在进行的序列。

集思广益的一些可能的解决方案,但我仍然觉得我肯定遗漏了一些更简单的东西:

  • 创建一个围绕asyncio.shield的包装器,该包装器递增由asyncio.Condition对象同步的变量,运行屏蔽函数,然后递减该变量。然后,在aiomain中的CancelledError处理程序中,等待变量达到零,然后再重新执行异常。(在一个实现中,我可能会建议使用__aexit__实现CancelledError上的等待零逻辑将这个类的所有部分合并到一个类中。)
  • 完全跳过使用asyncio的取消机制,转而使用asyncio.Event或类似的机制以允许中断点或可中断睡眠。尽管这看起来似乎更具侵入性,需要我指定哪些点被认为是可中断的,而不是声明哪些序列需要禁止取消。

解决方案

这是一个非常好的问题。我在计算答案的过程中学到了一些东西,所以我希望您仍在关注此帖子。

首先要研究的是Shield()方法是如何工作的?在这一点上,至少可以说,医生们令人困惑。直到我阅读了test_tasks.py中的标准库测试代码,我才明白这一点。以下是我的理解:

考虑以下代码片段:

async def coro_a():
    await asyncio.sheild(task_b())
    ...
task_a = asyncio.create_task(coro_a())
task_a.cancel()

当执行TASK_a.ancel()语句时,TASK_a确实被取消。AWAIT语句立即抛出CancelledError,而不等待TASK_b完成。但是TASK_b继续运行。外部任务(A)停止,但内部任务(B)未停止。

以下是说明这一点的程序的修改版本。主要的更改是在CancelledError异常处理程序中插入一个等待,以使您的程序多活几秒钟。我在Windows上运行,这就是为什么我也稍微更改了您的信号处理程序,但这是一个次要问题。我还在打印语句中添加了时间戳。

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    task = asyncio.create_task(task_loop())
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await asyncio.sleep(5.0)
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Output when ctrlC is struck during task1
# 
# Starting simulated task1 1590871747.8977509
# Got Control-C 1590871750.8385916
# True
# Shutting down task loop 1590871750.8425908
# Caught CancelledError 1590871750.8435903
# Finished simulated task1 1590871752.908434
# Starting simulated task2 1590871752.908434
# Program exit, cancelled 1590871755.8488846        

if __name__ == '__main__':
    main()

您可以看到您的程序没有工作,因为它在TASK1和Task2有机会完成之前一取消TASK_LOOP就退出了。它们一直都在那里(或者更确切地说,如果程序继续运行,它们就会在那里)。

这说明了Shield()和Cancel()是如何交互的,但它实际上并没有解决您所说的问题。为此,我认为,您需要有一个可等待的对象,以便在重要任务完成之前保持程序的生命力。这个对象需要在顶层创建,并沿着堆栈向下传递到执行重要任务的位置。这是一个与您的程序类似的程序,但以您想要的方式执行。

我执行了三次运行:(1)在任务1期间按Control-C,(2)在任务2期间按Control-C,(3)在两个任务都完成后按Control-C。在前两种情况下,程序一直持续到任务2完成。在第三种情况下,它立即结束。

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks(kwrap):
    fut = asyncio.get_running_loop().create_future()
    kwrap.awaitable = fut
    await task1()
    await task2()
    fut.set_result(1)

async def task_loop(kwrap):
    try:
        while True:
            await asyncio.shield(tasks(kwrap))
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    kwrap = KillWrapper()
    task = asyncio.create_task(task_loop(kwrap))
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await kwrap.awaitable
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

class KillWrapper:
    def __init__(self):
        self.awaitable = asyncio.get_running_loop().create_future()
        self.awaitable.set_result(0)

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Run 1 Control-C during task1
# Starting simulated task1 1590872408.6737766
# Got Control-C 1590872410.7344952
# True
# Shutting down task loop 1590872410.7354996
# Caught CancelledError 1590872410.7354996
# Finished simulated task1 1590872413.6747622
# Starting simulated task2 1590872413.6747622
# Finished simulated task2 1590872418.6750958
# Program exit, cancelled 1590872418.6750958
#
# Run 1 Control-C during task2
# Starting simulated task1 1590872492.927735
# Finished simulated task1 1590872497.9280624
# Starting simulated task2 1590872497.9280624
# Got Control-C 1590872499.5973852
# True
# Shutting down task loop 1590872499.5983844
# Caught CancelledError 1590872499.5983844
# Finished simulated task2 1590872502.9274273
# Program exit, cancelled 1590872502.9287038
#
# Run 1 Control-C after task2 -> immediate exit
# Starting simulated task1 1590873694.2925708
# Finished simulated task1 1590873699.2928336
# Starting simulated task2 1590873699.2928336
# Finished simulated task2 1590873704.2938952
# Got Control-C 1590873706.0790765
# True
# Shutting down task loop 1590873706.0804725
# Caught CancelledError 1590873706.0804725
# Program exit, cancelled 1590873706.0814824

相关文章