使用Asyncio逐行读取文件

2022-03-25 00:00:00 python python-asyncio

问题描述

我希望在写入时读取几个日志文件,并使用Asyncio处理它们的输入。代码必须在Windows上运行。根据我对Stackoverflow和Web的搜索了解,异步文件I/O在大多数操作系统上都很棘手(例如,select不会按预期工作)。虽然我确信我可以使用其他方法(例如线程)来做到这一点,但我想我应该尝试一下asyncio,看看它是什么样子。最有帮助的答案可能是描述此问题的解决方案的"体系结构"应该是什么样子的答案,即应该如何调用或调度不同的函数和协程。

下面提供了一个生成器,可以逐行读取文件(通过轮询,这是可以接受的):

import time

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            time.sleep(POLL_INTERVAL)
            continue
        process_line(line)

由于要监视和处理多个文件,这类代码需要线程。我稍微修改了一下,使其在异步时更易用:

import asyncio

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            yield from asyncio.sleep(POLL_INTERVAL)
            continue
        process_line(line)

当我通过异步事件循环安排它时,这种方式可以工作,但是如果process_data阻塞,那当然就不好了。刚开始的时候,我想象解决方案应该类似于

def process_data():
    ...
    while True:
        ...
        line = yield from line_reader()
        ...

但我想不出如何才能做到这一点(至少process_data需要管理相当多的状态)。

您对如何构建此类代码有什么想法吗?


解决方案

根据我对Stackoverflow和Web的搜索了解,异步文件I/O在大多数操作系统上都很棘手(例如,SELECT将不会按预期工作)。虽然我确信我可以使用其他方法(例如线程)做到这一点,但我想我还是尝试一下异步方式,看看它是什么样子。

asyncio是基于幕后的*nix系统的select,因此如果不使用线程,您将无法执行非阻塞文件I/O。在Windows上,asyncio可以使用支持非阻塞文件I/O的IOCP,但asyncio不支持。

您的代码很好,只是您应该在线程中阻塞I/O调用,这样如果I/O很慢,您就不会挡路事件循环。幸运的是,使用loop.run_in_executor函数将工作分流到线程非常简单。

首先,为您的I/O设置专用线程池:

from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()

然后只需将所有阻塞I/O调用卸载到Executor:

...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...

相关文章