Python和DASK-读取和连接多个文件

2022-04-14 00:00:00 python dask dask-delayed

问题描述

我有一些parquet文件,它们都来自相同的域,但在结构上有所不同。我需要将它们全部连接起来。以下是这些文件的一些示例:

file 1:
A,B
True,False
False,False

file 2:
A,C
True,False
False,True
True,True

我要做的是以尽可能快的方式读取和连接这些文件,获得以下结果:

A,B,C
True,False,NaN
False,False,NaN
True,NaN,False
False,NaN,True
True,NaN,True

为此,我使用以下代码,使用(Reading multiple files with Dask,Dask dataframes: reading multiple files & storing filename in column)提取:

import glob

import dask.dataframe as dd
from dask.distributed import Client
import dask

def read_parquet(path):
    return pd.read_parquet(path)

if __name__=='__main__':

    files = glob.glob('test/*/file.parquet')

    print('Start dask client...')
    client = Client()

    results = [dd.from_delayed(dask.delayed(read_parquet)(diag)) for diag in diag_files]

    results = dd.concat(results).compute()

    client.close()

这段代码可以工作,而且它已经是我能想到的最快的版本(我尝试了顺序pandasmultiprocessing.Pool)。我的想法是,理想情况下,DASK可以在仍然读取一些文件的同时启动部分串联,然而,从任务图中我看到了对每个拼图文件的元数据的顺序读取,请参见下面的屏幕截图:

任务图的第一部分是read_parquetread_metadata的混合体。第一部分始终只显示一个已执行的任务(在任务处理选项卡中)。第二部分是from_delayedconcat的组合,它正在使用我的所有工作人员。

有什么建议可以加快文件读取速度,减少图表第一部分的执行时间吗?


解决方案

代码的问题在于您使用Pandas版本的 READ_PARQUET。

改为:

  • 任务READ_PARQUET版本,
  • 客户端提供的映射和聚集方法,
  • 任务Conat的版本,

类似:

def read_parquet(path):
    return dd.read_parquet(path)

def myRead():
    L = client.map(read_parquet, glob.glob('file_*.parquet'))
    lst = client.gather(L)
    return dd.concat(lst)

result = myRead().compute()

在此之前,我只创建了一次客户端。 原因是在我之前的实验中,我得到了一个错误 当我尝试再次创建它时(在函数中),甚至 虽然第一个实例以前已关闭。

相关文章