数据流BigQuery-BigQuery管道对较小的数据执行,但不对大型生产数据集执行

问题描述

这里的数据流有点新手,但已经成功地创建了一个工作良好的管道。

管道从BigQuery读取查询,应用Pardo(NLP函数),然后将数据写入新的BigQuery表。

我尝试处理的数据集大约为500 GB,包含46M条记录。

当我使用相同数据的子集(大约300k记录)尝试此操作时,它工作得很好,而且速度很快,请参见下面的内容:

当我尝试使用完整的数据集运行它时,它启动得非常快,但随后逐渐减少,最终失败。此时,作业失败,并添加了大约900k元素,约为6-7 GB,然后元素计数实际上开始减少。

我使用的是250名工人和一台N1-Highmem-6型机器

在工人日志中,我得到以下几条(大约10条):

Info
2021-04-22 06:29:38.236 EDTRefreshing due to a 401 (attempt 1/2)

这是最后的警告之一:

2021-04-22 06:29:32.392 EDTS08:[85]: GetArticles/Read+[85]: GetArticles/_PassThroughThenCleanup/ParDo(PassThrough)/ParDo(PassThrough)+[85]: ExtractEntity+[85]: WriteToBigQuery/BigQueryBatchFileLoads/RewindowIntoGlobal+[85]: WriteToBigQuery/BigQueryBatchFileLoads/AppendDestination+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/IdentityWorkaround+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Write+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(_ShardDestinations)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Write failed.

在执行详细信息中有多个:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

我假设这些项目来自数据集中可能需要一段时间才能处理的较大文本,所以在稍等片刻之后,这些项目将被处理,下一个项目将开始。

还有几个:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

所有这一切都让我有点困惑,而且不是完全直观--尽管它工作时的服务很棒。

我正在从Jupyter笔记本执行作业(不使用交互式运行器,只执行脚本)。

主管道如下:

p = beam.Pipeline()

#Create a collection from Bigquery
articles = p | "GetArticles" >> beam.io.ReadFromBigQuery(query='SELECT id,uuid, company_id_id, title, full_text, FROM `MY TABLE` ', gcs_location=dataflow_gcs_location, project='my_project',use_standard_sql=True)

#Extract entities with NLP
entities = articles | "ExtractEntity" >> beam.ParDo(EntityExtraction()) 

#Write to bigquery 
entities | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('myproject:dataset.table', schema = schema,custom_gcs_temp_location=dataflow_gcs_location, create_disposition="CREATE_IF_NEEDED",write_disposition="WRITE_APPEND")  ```

我做错了什么?这是记忆问题吗?我不应该像这样读写BigQuery,而是输出到一个文件并从中创建表吗?我希望得到一些帮助,很抱歉发了这么长的帖子,我想提供尽可能多的背景信息。


解决方案

可能来晚了,但我在一个包含7.7M行的BigQuery表上进行了一些测试,这些行带有大约要处理的字符串。350字。

我运行了与您相同的管道:

  1. 从BigQuery读取数据
  2. 使用pythonstring库清理字符串
  3. 使用Spacyfr_core_news_lg模型,获取字符串的词条部分
  4. 将数据写回BigQuery(在另一个表中)

一开始我遇到了和您一样的问题,每秒的元素数随着时间的推移而下降。

我意识到这是RAM的问题。我将machine_typecustom-1-3072更改为custom-1-15360-ext,并从与您相同的配置文件更改为此配置文件:

我认为数据流可以使用NLP模型处理大量行,但您必须确保为工作进程提供足够的RAM。

此外,使用number_of_worker_harness_threads=1确保数据流不会产生多个线程(从而将RAM拆分成线程)也很重要。

您也可以看看这个stack thread,初始问题是一样的。

最后,我的员工的CPU利用率从:

收件人:

这也是内存不足的标志。

编辑:我使用与您相同的数据量比例运行我的管道,以确保我的测试没有偏差,并且结果是相同的:RAM大小似乎是使作业顺利运行的关键:

相关文章