使用ValueProvider格式化数据流中的BigQuery

问题描述

我当前正在使用Dataflow在Python中执行循环批处理。

基本上我从BigQuery读取数据并在其上执行操作。我的管道如下所示

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
             | "doing stuff" >> beam.Map(do_some_stuff)
             )

我希望使用数据流模板运行作业,以使其适应运行时。

多亏了文档https://cloud.google.com/dataflow/docs/guides/templates/creating-templates,即函数部分中的Using ValueProvider,我成功地使用Pardo在运行时为"do_ome_Stuff"提供了一个额外的参数。


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

但我还希望更改进程关注的用户数,因此我希望使查询适应运行时。


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_nb_users',
                                           default=100,
                                           type=int)
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

.这不起作用,因为我在管道执行之前调用了get()。到目前为止,我还没有成功地将我对do_ome_Stuff函数所做的修改修改为"Read"行

任何关于如何进行的建议或解决方案都将不胜感激。谢谢!


解决方案

遗憾的是,BigQuerySource不支持值提供程序。这是因为它是在数据流运行器中本机实现的,因此所有信息都需要在管道构造时可用。

您可以尝试转换apache_beam.io.gcp.bigquery.ReadFromBigQuery-这将允许您使用值提供程序。

相关文章