任务集群:属性错误:DataFrame'对象没有属性'_Data'

2022-04-14 00:00:00 pandas dask dask-dataframe dockerfile

问题描述

我在GCP上使用Dask群集。我正在使用此代码来部署它:

from dask_cloudprovider.gcp import GCPCluster
from dask.distributed import Client

enviroment_vars = {
    'EXTRA_PIP_PACKAGES': '"gcsfs"'
}

cluster = GCPCluster(
    n_workers=32,
    docker_image='daskdev/dask:2021.2.0',
    env_vars=enviroment_vars,
    network='my-network',
    #filesystem_size=150,
    machine_type='e2-standard-16',
    projectid='my-project-id',
    zone='us-central1-a',
    on_host_maintenance="MIGRATE"

client = Client(cluster)

然后我读取CSV文件,代码如下:

import dask.dataframe as dd
import csv

col_dtypes = {
    'var1': 'float64',
    'var2': 'object',
    'var3': 'object',
    'var4': 'float64'
}

df = dd.read_csv('gs://my_bucket/files-*.csv', blocksize=None, dtype= col_dtypes)
df = df.persist()

一切正常,但当我尝试进行一些查询或计算时,我得到一个错误。例如,这段代码:

df.var1.value_counts().compute()

这是输出:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-14-711a7c21ed42> in <module>
----> 1 df.var1.value_counts().compute()

/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    279         dask.base.compute
    280         """
--> 281         (result,) = compute(self, traverse=False, **kwargs)
    282         return result
    283 

/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    561         postcomputes.append(x.__dask_postcompute__())
    562 
--> 563     results = schedule(dsk, keys, **kwargs)
    564     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    565 

/opt/conda/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2653                     should_rejoin = False
   2654             try:
-> 2655                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2656             finally:
   2657                 for f in futures.values():

/opt/conda/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1962             else:
   1963                 local_worker = None
-> 1964             return self.sync(
   1965                 self._gather,
   1966                 futures,

/opt/conda/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    836             return future
    837         else:
--> 838             return sync(
    839                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    840             )

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1827                             exc = CancelledError(key)
   1828                         else:
-> 1829                             raise exception.with_traceback(traceback)
   1830                         raise exc
   1831                     if errors == "skip":

/opt/conda/lib/python3.8/site-packages/dask/optimization.py in __call__()
    961         if not len(args) == len(self.inkeys):
    962             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    964 
    965     def __reduce__(self):

/opt/conda/lib/python3.8/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

/opt/conda/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/lib/python3.8/site-packages/dask/utils.py in apply()
     33 def apply(func, args, kwargs=None):
     34     if kwargs:
---> 35         return func(*args, **kwargs)
     36     else:
     37         return func(*args)

/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()
   5474             return meta
   5475         if is_dataframe_like(df):
-> 5476             check_matching_columns(meta, df)
   5477             c = meta.columns
   5478         else:

/opt/conda/lib/python3.8/site-packages/dask/dataframe/utils.py in check_matching_columns()
    690 def check_matching_columns(meta, actual):
    691     # Need nan_to_num otherwise nan comparison gives False
--> 692     if not np.array_equal(np.nan_to_num(meta.columns), np.nan_to_num(actual.columns)):
    693         extra = methods.tolist(actual.columns.difference(meta.columns))
    694         missing = methods.tolist(meta.columns.difference(actual.columns))

/opt/conda/lib/python3.8/site-packages/pandas/core/generic.py in __getattr__()
   5268             or name in self._accessors
   5269         ):
-> 5270             return object.__getattribute__(self, name)
   5271         else:
   5272             if self._info_axis._can_hold_identifiers_and_holds_name(name):

pandas/_libs/properties.pyx in pandas._libs.properties.AxisProperty.__get__()

/opt/conda/lib/python3.8/site-packages/pandas/core/generic.py in __getattr__()
   5268             or name in self._accessors
   5269         ):
-> 5270             return object.__getattribute__(self, name)
   5271         else:
   5272             if self._info_axis._can_hold_identifiers_and_holds_name(name):

AttributeError: 'DataFrame' object has no attribute '_data'

我的docker文件中的Pandas版本是1.0.1,所以我已经尝试将Pandas升级到1.2.2版,但不起作用,我做错了什么?


解决方案

我的猜测是您的某个版本不匹配。client.get_versions(check=True)说了什么?

相关文章