如何监控Python事件循环的繁忙程度?

2022-03-25 00:00:00 python python-asyncio

问题描述

我有一个异步应用程序,它通过aiohttp服务请求并执行其他异步任务(作为HTTP客户端与数据库交互、处理消息、发出请求)。我想监视事件循环的繁忙程度,比如它执行代码所花费的时间与等待select完成相比所花费的时间。select

有没有办法用标准库事件循环或其他第三方循环(Uvloop)来衡量这一点?

具体地说,我想要一个持续的饱和度百分比度量,而不仅仅是this question似乎要解决的二进制"是否繁忙"。


解决方案

挖源代码如下:

  1. 事件循环基本上是executing_run_oncewhile True循环中
  2. _run_once是否所有工作都在等待select完成including
  3. timeout等待selectisn't存储在_run_once之外的任何位置

没有什么能阻止我们重新实现_run_once,这样我们就可以为我们需要的东西计时。

我们不复制整个_run_once实现,而是正好在select开始之前计时(因为在select之前没有耗时)和selectis when_process_events开始后计时。

从言语到行动:

import asyncio

class MeasuredEventLoop(asyncio.SelectorEventLoop):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._total_time = 0
        self._select_time = 0

        self._before_select = None

    # TOTAL TIME:
    def run_forever(self):
        started = self.time()
        try:
            super().run_forever()
        finally:
            finished = self.time()
            self._total_time = finished - started

    # SELECT TIME:
    def _run_once(self):
        self._before_select = self.time()
        super()._run_once()

    def _process_events(self, *args, **kwargs):
        after_select = self.time()
        self._select_time += after_select - self._before_select
        super()._process_events(*args, **kwargs)

    # REPORT:
    def close(self, *args, **kwargs):
        super().close(*args, **kwargs)

        select = self._select_time
        cpu = self._total_time - self._select_time
        total = self._total_time

        print(f'Waited for select: {select:.{3}f}')
        print(f'Did other stuff: {cpu:.{3}f}')
        print(f'Total time: {total:.{3}f}')

让我们测试一下:

import time


async def main():
    await asyncio.sleep(1)  # simulate I/O, will be handled by selectors
    time.sleep(0.01)        # CPU job, executed here, outside event loop
    await asyncio.sleep(1)
    time.sleep(0.01)


loop = MeasuredEventLoop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(main())
finally:
    loop.close()

结果:

Waited for select: 2.000
Did other stuff: 0.032
Total time: 2.032

让我们对照具有实际I/O的代码测试它:

import aiohttp


async def io_operation(delay):
    async with aiohttp.ClientSession() as session:
        async with session.get(f'http://httpbin.org/delay/{delay}') as resp:
            await resp.text()


async def main():
    await asyncio.gather(*[
        io_operation(delay=1),
        io_operation(delay=2),
        io_operation(delay=3),
    ])

结果:

Waited for select: 3.250
Did other stuff: 0.016
Total time: 3.266

相关文章