阿帕奇光束到BigQuery

问题描述

我正在Google Cloud数据流中构建一个流程,该流程将使用发布/订阅中的消息,并基于一个键的值将它们写入BQ或GCS。我能够拆分消息,但我不确定如何将数据写入BigQuery。我已尝试使用beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。

我的完整代码在这里:https://pastebin.com/4W9Vu4Km

基本上我的问题是我不知道如何在WriteBatchesToBQ(第73行)中指定变量element应该写入BQ。

我还尝试在管道中直接使用beam.io.gcp.bigquery.WriteToBigQuery(第128行),但随后收到错误AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)'] 。这可能是因为我向它提供的不是词典,而是词典列表(我希望使用1分钟窗口)。

有什么想法吗?(另外,如果代码中有一些太愚蠢的地方,请告诉我-我只使用了很短的时间,我可能忽略了一些明显的问题)。


解决方案

第二种方法是解决这个问题,您需要直接在管道中使用WriteToBigQuery函数。但是,需要包括beam.FlatMap步骤,以便WriteToBigQuery能够正确处理词典列表。

因此,完整的管道拆分数据按时间分组并写入BQ的定义如下:

 accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
            window_size) | "FlatMap" >> beam.FlatMap(
            lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
                                                                                               schema=(
                                                                                                   output_table_bq_schema),
                                                                                               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                                               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

完整的工作代码在此处:https://pastebin.com/WFwBvPcU

相关文章