在数据库中使用PySpark在Azure DataLake中按分区覆盖策略(&A)

问题描述

我在Azure环境中有一个简单的ETL进程

BLOB存储&>数据工厂&>数据创建原始数据库&>数据创建 管理>;数据仓库(主ETL)。

此项目的数据集不是很大(约100万行20列),但我希望将它们在我的DataRake中正确分区为Parquet文件。

目前,我运行一些简单的逻辑来计算每个文件应该位于我的湖中的哪个位置,而不是基于商业日历。

这些文件模糊地看起来像这样

Year Week Data
2019 01   XXX
2019 02   XXX

然后,我将给定文件分区为以下格式,替换现有数据并为新数据创建新文件夹。

curated ---
           dataset --
                     Year 2019 
                              - Week 01 - file.pq + metadata
                              - Week 02 - file.pq + metadata
                              - Week 03 - file.pq + datadata #(pre existing file)

元数据是自动生成的成功和提交。

为此,我在Pyspark 2.4.3中使用了以下查询

pyspark_dataframe.write.mode('overwrite')
                         .partitionBy('Year','Week').parquet('curateddataset')

现在,如果我单独使用此命令,它将覆盖目标分区中的所有现有数据

因此Week 03将丢失。

使用spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")似乎可以停止该问题,并且只覆盖目标文件,但我不知道这是否是处理我的数据湖中文件的最佳方式?

我还发现很难找到有关上述功能的任何文档。

我的第一反应是在一块拼花地板上循环,然后手动编写每个分区,虽然这给了我更大的控制力,但循环会很慢。

我的下一个想法是将每个分区写入/tmp文件夹,并移动每个拼图文件,然后使用上面的查询根据需要替换文件/创建文件。然后清除/tmp文件夹,同时创建某种元数据日志。

有没有更好的办法/方法?

如有任何指导,将不胜感激。

这里的最终目标是拥有一个干净和安全的区域来存放所有‘经过管理’的数据,同时拥有一个镶木地板文件的日志,我可以将其读取到数据仓库中以进行进一步的ETL。


解决方案

我看到您在Azure堆栈中使用数据库。我认为最可行和推荐给您使用的方法是利用Databricks中新的三角洲湖项目:

它提供各种更新、合并和ACID事务到对象存储(如S3或Azure数据湖存储)的选项。它主要提供数据仓库为数据决策提供的管理、安全、隔离和插入/合并。其中一条途径是,由于Delta数据库的功能和灵活性,苹果实际上取代了它的数据仓库,使其完全在Delta数据库上运行。对于您的用例和许多其他使用拼图的用例,只需简单地将‘parket’替换为‘Delta’,以便使用它的功能(如果您有数据库的话)。Delta基本上是拼图的自然演变,而Databricks通过提供附加功能和开源,做得很好。

对于您的情况,我建议您尝试Delta中提供的replaceWhere选项。在进行此定向更新之前,目标表的格式必须为增量

而不是这个:

dataset.repartition(1).write.mode('overwrite')
                         .partitionBy('Year','Week').parquet('curataeddataset')

发件人https://docs.databricks.com/delta/delta-batch.html:

‘您可以有选择地仅覆盖与分区列上的谓词匹配的数据

您可以尝试以下操作:

dataset.write.repartition(1)
       .format("delta")
       .mode("overwrite")
       .partitionBy('Year','Week')
       .option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'") #to avoid overwriting Week3
       .save("curataeddataset")

另外,如果希望将分区设置为%1,为什么不使用Coalesce(%1),因为它将避免完全洗牌。

发件人https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/:

‘替换当您必须运行计算开销较大的算法时,特别有用,但仅在某些分区上’

因此,我个人认为使用replacewhere手动指定您的覆盖将比仅依赖于以下各项更有针对性和计算效率: spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Databricks提供了对增量表的优化,使其成为一种更快、更高效的拼花选择(因此是一种自然演变),通过装箱和z排序:

发件人链接:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

  • 其中(装箱)

‘优化与给定分区谓词匹配的行子集。仅支持涉及分区键属性的筛选器。’

  • ZORDER BY

‘将列信息放在同一组文件中。Delta Lake数据跳过算法使用协同定位来显著减少需要读取的数据量。

  • 查询执行速度更快支持索引、统计和自动缓存

  • 数据可靠性和丰富的架构验证和事务保证

  • 简化的数据管道,支持灵活的UPSERT,在单个数据源上实现统一的结构化流+批处理

您还可以查看开放源码项目的完整文档:https://docs.delta.io/latest/index.html

..我还想说的是,我并不是在德尔塔湖/贝里克斯工作。我刚刚看到他们的改进和功能使我在工作中受益。

更新:

问题的要点是"替换现有数据并为新数据创建新文件夹",并以高度可伸缩和高效的方式完成此操作。

在拼图中使用动态分区覆盖完成了这项工作,但我觉得该方法的自然演变是使用增量表合并操作,这些操作基本上是为了‘将数据从Spark DataFrames集成到Delta Lake’而创建的。它们为您提供了额外的功能和优化,使您可以根据希望的方式合并数据,并保留对表的所有操作的日志,以便您可以在需要时回滚版本。

Delta Lake python接口(表示合并): https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder

数据库优化:https://kb.databricks.com/delta/delta-merge-into.html#discussion

使用单个合并操作,您可以指定合并条件,在本例中它可以是年、周和id的组合,然后如果记录匹配(意味着它们存在于您的Spark数据帧和增量表中,第1周和第2周),则使用您的Spark数据帧中的数据更新它们,并保持其他记录不变:

#you can also add additional condition if the records match, but not required
.whenMatchedUpdateAll(condition=None)

在某些情况下,如果没有匹配,则可能需要插入并创建新的行和分区,为此可以使用:

.whenNotMatchedInsertAll(condition=None)

您可以使用.转换增量操作https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.convertToDelta将拼图表格转换为增量表格,以便您可以使用API对其执行增量操作。

‘现在,您无需重写任何数据即可将拼图表格就地转换为Delta Lake表格。这对于将非常大的拼图表格转换为Delta表格非常有用。此外,此过程是可逆的’

您的合并案例(替换存在的数据并在数据不存在时创建新记录)可能如下所示:

(未测试,语法参见示例+API)

%python  
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`curataeddataset`")

deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year  AND target.Week = dataset.Week") 
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
如果增量表分区正确(年、周),并且正确使用了WHEN MATCHED子句,则这些操作将得到高度优化,在您的情况下可能需要几秒钟。它还通过回滚选项为您提供一致性、原子性和数据完整性。

提供的更多功能是,如果匹配成功,则可以指定要更新的列集(如果只需要更新某些列)。您还可以启用spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true"),以便增量使用最少的目标分区来执行合并(更新、删除、创建)。

总体而言,我认为使用这种方法是执行定向更新的一种非常新的创新方式,因为它使您能够更好地控制更新,同时保持操作的高效。使用具有动态分区覆盖模式的拼图也可以很好地工作,但是,Delta Lake功能会为您的数据湖带来无与伦比的数据质量。

我的建议: 我会说,目前,对拼图文件使用动态分区覆盖模式进行更新,您可以试验并尝试在spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true").whenMatchedUpdateAll()的Databricks优化下仅在一个表上使用增量合并,并比较两者的性能(您的文件很小,所以我认为这不会有太大差异)。针对合并的Databricks分区修剪优化一文是在2月发表的,因此这篇文章非常新,可能会改变增量合并操作的开销(因为在幕后,它们只是创建新文件,但分区修剪可以加快它的速度)

合并python、Scala、SQL中的示例:https://docs.databricks.com/delta/delta-update.html#merge-examples

https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html

相关文章