Python和DASK-读取和连接多个文件
问题描述
我有一些parquet
文件,它们都来自相同的域,但在结构上有所不同。我需要将它们全部连接起来。以下是这些文件的一些示例:
file 1:
A,B
True,False
False,False
file 2:
A,C
True,False
False,True
True,True
我要做的是以尽可能快的方式读取和连接这些文件,获得以下结果:
A,B,C
True,False,NaN
False,False,NaN
True,NaN,False
False,NaN,True
True,NaN,True
为此,我使用以下代码,使用(Reading multiple files with Dask,Dask dataframes: reading multiple files & storing filename in column)提取:
import glob
import dask.dataframe as dd
from dask.distributed import Client
import dask
def read_parquet(path):
return pd.read_parquet(path)
if __name__=='__main__':
files = glob.glob('test/*/file.parquet')
print('Start dask client...')
client = Client()
results = [dd.from_delayed(dask.delayed(read_parquet)(diag)) for diag in diag_files]
results = dd.concat(results).compute()
client.close()
这段代码可以工作,而且它已经是我能想到的最快的版本(我尝试了顺序pandas
和multiprocessing.Pool
)。我的想法是,理想情况下,DASK可以在仍然读取一些文件的同时启动部分串联,然而,从任务图中我看到了对每个拼图文件的元数据的顺序读取,请参见下面的屏幕截图:
任务图的第一部分是read_parquet
和read_metadata
的混合体。第一部分始终只显示一个已执行的任务(在任务处理选项卡中)。第二部分是from_delayed
和concat
的组合,它正在使用我的所有工作人员。
有什么建议可以加快文件读取速度,减少图表第一部分的执行时间吗?
解决方案
代码的问题在于您使用Pandas版本的 READ_PARQUET。
改为:
- 任务READ_PARQUET版本,
- 客户端提供的映射和聚集方法,
- 任务Conat的版本,
类似:
def read_parquet(path):
return dd.read_parquet(path)
def myRead():
L = client.map(read_parquet, glob.glob('file_*.parquet'))
lst = client.gather(L)
return dd.concat(lst)
result = myRead().compute()
在此之前,我只创建了一次客户端。 原因是在我之前的实验中,我得到了一个错误 当我尝试再次创建它时(在函数中),甚至 虽然第一个实例以前已关闭。
相关文章