将一列随机数添加到DaskDataFrame的正确方法

问题描述

将一列随机数添加到DaskDataFrame的正确方法是什么?我显然可以使用map_partitions将列添加到每个分区,但我不确定当Dask并行化该计算时如何处理随机状态。(即,它是否会在所有工作进程中使用相同的随机状态,从而在每个工作进程中生成相同的随机数?)

dask.array.random(https://docs.dask.org/en/latest/_modules/dask/array/random.html)中似乎有相关函数,但我找不到如何将这些函数与DaskDataFrame一起使用的示例。


解决方案

根据此讨论 (https://github.com/dask/distributed/issues/2558),不需要设置/跟踪numpy种子,推荐的方法是使用dask.array(问题中提到过)。也许,实现可重现随机性的最佳途径是创建dask.array并转换为dask.dataframe

import dask.array as da

# this is not reproducible
for _ in range(3):
    x = da.random.random((10, 1), chunks=(2, 2))
    print(x.sum().compute())

# this is reproducible
for _ in range(3):
    state = da.random.RandomState(1234)
    y = state.random(size=(10,1), chunks=(2,2))
    print(y.sum().compute())

# conver to ddf
import dask.dataframe as dd
ddf = dd.from_dask_array(y, columns=['A'])

# if there's another existing dataframe ddf2
ddf2 = dd.from_pandas(pd.DataFrame(range(10), columns=['B']), npartitions=2)
ddf2

# then simple column assignment will work even if partitions are not aligned
ddf2['A'] = ddf['A']
print((ddf.compute() == ddf2[['A']].compute()).sum() == len(ddf))

# of course it will be more efficient to have partitions align
# you can inspect the DAG with ddf2.visualize() to see why
# also note carefully that the lengths of ddf and ddf2 should match
# otherwise there might be unexpected situations downstream
# to see why, try changing the size of `y` above and then compare
# ddf and ddf2

相关文章