提取具有非唯一索引列日期的DaskDataFrame中的最新值

2022-04-14 00:00:00 python pandas dask

问题描述

我非常熟悉 pandas 数据帧,但我对Dask太陌生,所以我仍在努力实现代码的并行化。 我已经用 pandas 和 pandas 得到了我想要的结果,所以我想弄清楚的是,我是否可以用Dask来扩大任务的规模或以某种方式加快它的速度。

假设我的DataFrame将DateTime作为非唯一索引、一个值列和一个id列。

time                        value   id
2021-01-01 00:00:00.210281  28.08   293707
2021-01-01 00:00:00.279228  28.07   293708
2021-01-01 00:00:00.697341  28.08   293709
2021-01-01 00:00:00.941704  28.08   293710
2021-01-01 00:00:00.945422  28.07   293711
...     ...     ...
2021-01-01 23:59:59.288914  29.84   512665
2021-01-01 23:59:59.288914  29.83   512666
2021-01-01 23:59:59.288914  29.82   512667
2021-01-01 23:59:59.525227  29.84   512668
2021-01-01 23:59:59.784754  29.84   512669

我要提取的是每秒的最新值。例如,如果2021-01-01 00:00:01之前的价格是索引为2021-01-01 00:00:00.945422的行,则最新的值为28.07

在我的例子中,有时索引值不是唯一的,因此为了打破平局,我想使用id列。具有最大id数字的值将被视为最新值。对于在时间2021-01-01 23:59:59.288914并列的三个值的情况,将选择29.82值,因为该日期的最大id将是512667。另请注意,id在整个数据集中并不一致,我不能仅依靠它来对我的数据进行排序。

在 pandas 身上,我只需获取最后一个索引即可

last_index = df.loc[date_minus60: date_curr].index[-1]
last_values = df.loc[last_index]

如果last_values.index.is_unique的值为FALSE,则最后执行last_values.sort_values('id').iloc[-1]

我一直在将此代码转换为DASK时遇到困难,遇到了有关延迟函数的问题,导致它们需要计算才能再次重新索引我的数据帧。

我想知道是否有处理此类问题的最佳做法。


解决方案

下面的代码片段显示它的语法非常相似:

import dask

# generate dask dataframe
ddf = dask.datasets.timeseries(freq="500ms", partition_freq="1h")

# generate a pandas dataframe
df = ddf.partitions[0].compute()  # pandas df for example

# sample dates
date_minus60 = "2000-01-01 00:00:00.000"
date_curr = "2000-01-01 00:00:02.000"

# pandas code
last_index_pandas = df.loc[date_minus60:date_curr].index[-1]
last_values_pandas = df.loc[last_index_pandas]

# dask code
last_index_dask = ddf.loc[date_minus60:date_curr].compute().index[-1]
last_values_dask = ddf.loc[last_index_dask].compute()

# check equality of the results
print(last_values_pandas == last_values_dask)
请注意,在dask版本中,区别在于两个.compute步骤,因为需要计算两个惰性值:第一个是找到正确的索引位置,第二个是获得实际值。此外,这还假设数据已经按时间戳编制了索引,如果没有,最好在加载到dask之前为数据编制索引,因为.set_index通常是一个较慢的操作。

然而,根据您在此之后的实际用途,dask可能不是一个很好的用途。如果其基本思想是进行快速查找,那么更好的解决方案是使用索引数据库(包括专门的时间序列数据库)。

最后,上面的代码片段使用了唯一索引。如果实际数据具有非唯一索引,那么根据最大id选择的要求应该是在计算出last_values_dask之后处理的,方法如下(伪代码,不会立即生效):

def get_largest_id(last_values):
    return last_values.sort_values('id').tail(1)

last_values_dask = get_largest_id(last_values_dask)

如果查找的是批次(而不是特定的样本日期),则可以设计更好的管道。

相关文章