使用Asyncio逐行读取文件
问题描述
我希望在写入时读取几个日志文件,并使用Asyncio处理它们的输入。代码必须在Windows上运行。根据我对Stackoverflow和Web的搜索了解,异步文件I/O在大多数操作系统上都很棘手(例如,select
不会按预期工作)。虽然我确信我可以使用其他方法(例如线程)来做到这一点,但我想我应该尝试一下asyncio,看看它是什么样子。最有帮助的答案可能是描述此问题的解决方案的"体系结构"应该是什么样子的答案,即应该如何调用或调度不同的函数和协程。
下面提供了一个生成器,可以逐行读取文件(通过轮询,这是可以接受的):
import time
def line_reader(f):
while True:
line = f.readline()
if not line:
time.sleep(POLL_INTERVAL)
continue
process_line(line)
由于要监视和处理多个文件,这类代码需要线程。我稍微修改了一下,使其在异步时更易用:
import asyncio
def line_reader(f):
while True:
line = f.readline()
if not line:
yield from asyncio.sleep(POLL_INTERVAL)
continue
process_line(line)
当我通过异步事件循环安排它时,这种方式可以工作,但是如果process_data
阻塞,那当然就不好了。刚开始的时候,我想象解决方案应该类似于
def process_data():
...
while True:
...
line = yield from line_reader()
...
但我想不出如何才能做到这一点(至少process_data
需要管理相当多的状态)。
您对如何构建此类代码有什么想法吗?
解决方案
根据我对Stackoverflow和Web的搜索了解,异步文件I/O在大多数操作系统上都很棘手(例如,SELECT将不会按预期工作)。虽然我确信我可以使用其他方法(例如线程)做到这一点,但我想我还是尝试一下异步方式,看看它是什么样子。
asyncio
是基于幕后的*nix系统的select
,因此如果不使用线程,您将无法执行非阻塞文件I/O。在Windows上,asyncio
可以使用支持非阻塞文件I/O的IOCP,但asyncio
不支持。
您的代码很好,只是您应该在线程中阻塞I/O调用,这样如果I/O很慢,您就不会挡路事件循环。幸运的是,使用loop.run_in_executor
函数将工作分流到线程非常简单。
首先,为您的I/O设置专用线程池:
from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()
然后只需将所有阻塞I/O调用卸载到Executor:
...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...
相关文章