ValueError:必须使用beam.io.gcp.bigquery.ReadFromBigQuery指定BigQuery表或查询
问题描述
我正在尝试传递一个BigQuery表名作为一个ApacheBEAM管道模板的值提供者。根据their documentation和StackOverflow answer,可以将值提供程序传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery
。
这就是我的管道代码
class UserOptions(PipelineOptions):
"""Define runtime argument"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input', type=str)
parser.add_value_provider_argument('--output', type=str)
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)
(p | 'Read from BQ Table' >> beam.io.gcp.bigquery.ReadFromBigQuery(
user_options.input
)
当我本地运行代码时,命令行传递user_options.input
is--input projectid.dataset_id.table
但是,我遇到错误:
ValueError: A BigQuery table or a query must be specified
我已尝试:
传递
projectid:dataset_id.table
使用
bigquery.TableReference
-&>不可能使用
f'
{user_options.input}'
传递查询-&>在本地运行时有效,但在GCP上调用模板时无效。错误语句:
缺少数据集,但请求中未设置默认数据集。错误:[{";Message";:";Table name";RounmeValueProvider(Option:Input,type:Str,Default_Value:None)],";请求中未设置默认数据集。";,";域";:";全局";,";原因";:";无效";},";状态";:&QOOT;INVALID_ARGUMENT&QOOT;}}>;
我错过了什么?
解决方案
table
参数必须按名称传递给ReadFromBigQuery
。
BigQuerySource
(已弃用)接受table
作为第一个参数,因此您可以按位置(docs)传递一个。但ReadFromBigQuery
预期gcs_location
作为第一个参数(docs)。因此,如果您正在将代码从使用BigQuerySource
移植到使用ReadFromBigQuery
,并且您没有显式地按名称传入表,那么它将失败,并显示您收到的错误。
这里有两个工作示例和一个不工作的示例:
import apache_beam as beam
project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'
if __name__ == "__main__":
args = [
'--temp_location=gs://my_temp_bucket',
]
# This works:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(table=f"{project_id}:{dataset_id}.{table_id}")
)
# So does this:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(table=f"{dataset_id}.{table_id}", project=project_id)
)
# But this doesn't work becuase the table argument is not passed in by name.
# The f"{project_id}:{dataset_id}.{table_id}" string is interpreted as the gcs_location.
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(f"{project_id}:{dataset_id}.{table_id}")
)
相关文章