并行读取文件并参数化类参数
问题描述
假设我有一个类,并且希望并行从磁盘读取几个文件,并将类参数参数化。做这件事最正确的方法是什么(以及如何做)?- 主线程应等待load_data()操作结束,然后再进行任何其他操作。
我考虑过线程化,因为它只是I/O操作。
非并行实现示例(单线程):
import pandas as pd
class DataManager(object):
def __init__(self):
self.a = None
self.b = None
self.c = None
self.d = None
self.e = None
self.f = None
def load_data(self):
self.a = pd.read_csv('a.csv')
self.b = pd.read_csv('b.csv')
self.c = pd.read_csv('c.csv')
self.d = pd.read_csv('d.csv')
self.e = pd.read_csv('e.csv')
self.f = pd.read_csv('f.csv')
if __name__ == '__main__':
dm = DataManager()
dm.load_data()
# Main thread is waiting for load_data to finish.
print("finished loading data")
cpu
I/O操作在大多数情况下不受推荐答案限制,因此使用多个进程是过度的。使用多个线程可能很好,但是pb.read_csv
不仅可以读取文件,还可以对其进行CPU受限的解析。我建议您在最初为此目的创建文件时立即使用Asyncio从磁盘读取文件。以下是执行此操作的代码:
import asyncio
import aiofiles
async def read_file(file_name):
async with aiofiles.open(file_name, mode='rb') as f:
return await f.read()
def read_files_async(file_names: list) -> list:
loop = asyncio.get_event_loop()
return loop.run_until_complete(
asyncio.gather(*[read_file(file_name) for file_name in file_names]))
if __name__ == '__main__':
contents = read_files_async([f'files/file_{i}.csv' for i in range(10)])
print(contents)
函数read_files_async
返回文件内容(字节缓冲区)列表,您可以将其传递给pd.read_csv
。
我觉得优化文件只读应该足够了,但是可以多进程并行解析文件内容(线程和异步不会提高解析过程的性能):
import multiprocessing as mp
NUMBER_OF_CORES = 4
pool = mp.Pool(NUMBER_OF_CORES)
pool.map(pb.read_csv, contents)
您应该根据您的计算机规格设置NUMBER_OF_CORES
。
相关文章