DASK计算过去n天的分组滚动平均值,并分配给原始数据帧
问题描述
我正试图通过在DASK中滚动均值逻辑来复制下面的 pandas 群体。但停留在1)如何指定时间段(以天为单位)和2)如何将其分配回原始帧?
df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean())
获得如下错误:
ValueError: index must be monotonic
,ValueError: Not all divisions are known, can't align partitions
或ValueError: cannot reindex from a duplicate axis
完整示例
import pandas as pd
import dask.dataframe
df1 = pd.DataFrame({'g':['a']*10,'v':range(10)},index=pd.date_range('2020-01-01',periods=10))
df2=df1.copy()
df2['g']='b'
df = pd.concat([df1,df2]).sort_index()
df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean())
ddf = dask.dataframe.from_pandas(df, npartitions=4)
# works
ddf.groupby('g')['v'].apply(lambda x: x.rolling(3).mean(), meta=('avg3d', 'f8')).compute()
# rolling time period fails
ddf.groupby('g')['v'].apply(lambda x: x.rolling('3D').mean(), meta=('avg3d', 'f8')).compute()
# how do I add it to the rest of the data??
# neither of these work
ddf['avg3d']=ddf.groupby('g')['v'].apply(lambda x: x.rolling('3D').mean(), meta=('x', 'f8'))
ddf['avg3d']=ddf.groupby('g')['v'].transform(lambda x: x.rolling(3).mean(), meta=('x', 'f8'))
ddft = ddf.merge(ddf3d)
ddf.assign(avg3d=ddf.groupby('g')['v'].transform(lambda x: x.rolling(3).mean(), meta=('x', 'f8')))
已查看
dask groupby apply then merge back to dataframe
Dask rolling function by group syntax
Compute the rolling mean over the last n days in Dask
ValueError: Not all divisions are known, can't align partitions error on dask dataframe
解决方案
此问题源于.groupby
在dASK中的当前实现。下面的答案不是完整的解决方案,但有望解释错误发生的原因。
首先,让我们确保获得true_result
,我们可以将DASK结果与其进行比较:
import dask.dataframe
import pandas as pd
df1 = pd.DataFrame(
{"g": ["a"] * 10, "v": range(10)}, index=pd.date_range("2020-01-01", periods=10)
)
df = pd.concat([df1, df1.assign(g="b")]).sort_index()
df["avg3d"] = df.groupby("g")["v"].transform(lambda x: x.rolling("3D").mean())
true_result = df["avg3d"].array
现在,运行用#works
注释的代码将每次生成不同的值,即使数据或计算没有随机性来源:
ddf = dask.dataframe.from_pandas(df, npartitions=4)
# this doesn't work
dask_result_1 = ddf.groupby("g")["v"].apply(
lambda x: x.rolling(3).mean(), meta=("avg3d", "f8")
).compute().array
# this will fail, every time for a different reason
assert all(dask_result_1 == true_result)
为什么会发生这种情况?好的,在幕后,Dask会想要打乱数据,以确保groupby
变量的所有值都在单个分区中。这种混洗似乎是随机的,因此当这些值被缝合在一起时,它们可能会打乱原始顺序。
因此,解决此问题的一个快捷方法是在滚动计算之前添加排序:
# rolling time period works
avg3d_dask = (
ddf.groupby("g")["v"]
.apply(lambda x: x.sort_index().rolling("3D").mean(), meta=("avg3d", "f8"))
.compute()
.droplevel(0)
.sort_index()
)
# this will always pass
assert all(avg3d_dask == true_result)
现在,我们如何将其添加到原始数据名中?我不知道有什么简单的方法可以做到这一点,但最困难的方法之一是计算原始DaskDataFrame的分区,然后将数据分割成适当的块并进行分配。然而,这种方法不是很健壮(或者至少需要大量特定于用例的微调),因此希望有人能为这一部分提供更好的解决方案。
相关文章