ValueError:必须使用beam.io.gcp.bigquery.ReadFromBigQuery指定BigQuery表或查询

问题描述

我正在尝试传递一个BigQuery表名作为一个ApacheBEAM管道模板的值提供者。根据their documentation和StackOverflow answer,可以将值提供程序传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery

这就是我的管道代码

class UserOptions(PipelineOptions):
    """Define runtime argument"""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--input', type=str)
        parser.add_value_provider_argument('--output', type=str)

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)

(p | 'Read from BQ Table' >> beam.io.gcp.bigquery.ReadFromBigQuery(
             user_options.input
         )

当我本地运行代码时,命令行传递user_options.inputis--input projectid.dataset_id.table

的值

但是,我遇到错误:

ValueError: A BigQuery table or a query must be specified

我已尝试:

  • 传递projectid:dataset_id.table

  • 使用bigquery.TableReference-&>不可能

  • 使用f'{user_options.input}'

  • 传递查询-&>在本地运行时有效,但在GCP上调用模板时无效。错误语句:

    缺少数据集,但请求中未设置默认数据集。错误:[{";Message";:";Table name";RounmeValueProvider(Option:Input,type:Str,Default_Value:None)],";请求中未设置默认数据集。";,";域";:";全局";,";原因";:";无效";},";状态";:&QOOT;INVALID_ARGUMENT&QOOT;}}>;

我错过了什么?


解决方案

table参数必须按名称传递给ReadFromBigQuery

BigQuerySource(已弃用)接受table作为第一个参数,因此您可以按位置(docs)传递一个。但ReadFromBigQuery预期gcs_location作为第一个参数(docs)。因此,如果您正在将代码从使用BigQuerySource移植到使用ReadFromBigQuery,并且您没有显式地按名称传入表,那么它将失败,并显示您收到的错误。

这里有两个工作示例和一个不工作的示例:

import apache_beam as beam

project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'

if __name__ == "__main__":
    args = [
        '--temp_location=gs://my_temp_bucket',
    ]
    # This works:
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery' 
          >> beam.io.ReadFromBigQuery(table=f"{project_id}:{dataset_id}.{table_id}")
        )
    # So does this:
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery' 
          >> beam.io.ReadFromBigQuery(table=f"{dataset_id}.{table_id}", project=project_id)
        )
    # But this doesn't work becuase the table argument is not passed in by name.
    # The f"{project_id}:{dataset_id}.{table_id}" string is interpreted as the gcs_location.
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery'
          >> beam.io.ReadFromBigQuery(f"{project_id}:{dataset_id}.{table_id}")
        )

相关文章