数据湖Iceberg技术在小米的落地与场景应用

2023-01-03 00:00:00 数据 用户 文件 分区 离线

导读:随着流批一体技术的发展,和对实时查询的需求以及出于成本的优化考虑,小米对数据湖 iceberg 技术做了一些实践和场景落地。

今天介绍的内容主要有以下四个方面:

  • Iceberg技术简介

  • Iceberg在小米的应用实践

  • 基于Iceberg的流批一体的探索

  • 未来规划

分享嘉宾|李培殿 小米 软件研发工程师

编辑整理|陈业利 英祐科技

出品社区|DataFun



01
Iceberg技术简介

首先介绍一下Iceberg技术。

Iceberg是一个基于大型分析型数据上的一个表格式,它允许将一些文件、数据集以表的形式提供给spark、trino、prestodb、flink、hive这些计算引擎。

通过下面右图可以看到iceberg所处的位置,与hudi, delta lake相同。

通过iceberg这个抽象层,将上层的计算与下层的存储进行分离,这样就使我们在存储和计算上的选择更灵活。

下层有parquet、orc、avro可以选择,底层的实际物理存储上可以选择s3, aliyun oss以及HDFS。通过iceberg这个抽象,大的优势是可以将底层文件的细节对用户屏蔽,用户可以通过表的方式去访问,而不需要关心底层到底是存了什么格式的文件,或是存在哪里。

Iceberg实现的本质原理是一种文件的组织方式,包括4级的结构。

从下往上看,底层是一些写入的数据文件,即data file。当我们写完这些data file之后,它的元数据文件,会有一些清单文件,即manifest文件,来记录文件和分区的关系。在iceberg也有分区的概念,分区和文件对应关系都记录在清单文件里面。当这个清单文件写完之后都会形成一个快照,每次commit都会形成一个新的快照。快照会通过metadata文件来进行记录。这样如果我们使用一些历史回溯,就可以通过这个文件索引去确定使用哪些快照。

Iceberg有三大优势,即:事务性、隐式分区和行级更新。

事务性带来的好处是:

  • 首先,可以避免写入失败带来的脏数据。

  • 其次,我们可以用刚才所说的快照的方式来实现读写分离。以hive为例,比如hive读的程序已经启动,结果这些文件刚好被overwrite了,就可能会导致读程序的失败。而iceberg这种使用快照方式通过读到不同的快照来进行分离,做到读写的隔离。

Iceberg的第二大优势是它提供了一个隐式分区。

我们可以在建表的时候对一个列做一个转换,用一个如transform的函数来做一个隐式分区。这样当我们写入数据的时候,不需要额外指定一个分区来写入,直接写就可以了,它会根据数据去判定到底落在哪个分区,而且它的分区也不跟目录强绑定。另外它提供了partition evolution机制,提供灵活的分区变更,当我们在把月替换成天,底层查询时会执行不同的查询计划。

它的第三大优势是行级更新的能力。

iceberg现有两个版本,即V1版本和V2版本,我们经常称V1表和V2表。V1表的更新采用Copy On Write(COW)模式,就是将需要更新的文件读取出来做更新后再写入。在V2表中除了Copy On Write,还增加了Merge On Read(MOR)。

下图是对Merge On Read的一个简单介绍,它通过记录另外两个文件,即position delete和equality delete文件来对已有的文件进行删除,当读取的时候进行merge得到终的结果。

02

Iceberg在小米的应用实践

介绍了iceberg的一些背景知识后,这一章将介绍iceberg在小米的一些实践场景。

现在iceberg在小米大概有4000多张表,数据大概是8PB,其中v1表1000多张,v2表3000多张。v1表主要是一些日志场景,v2表是可以配一些需要通过主键进行更新的场景。

v2表多的场景就是一些cdc数据,也就是ChangeLog的数据入湖。链路大概如下,通过flink cdc采集mysql、oracle、tidb的一些数据,打到中间的mq,然后通过flink写到iceberg的v2表里面。

这样做的优势在于:

  • 首先我们可以对cdc数据进行近实时分析。

  • 另外,iceberg可以实现支持flink引擎,因此我们可以进行流式消费,在这一块上我们开发了对v2表进行flink流式消费的支持。

  • 第三,可以同步变更schema,比如上游的mysql schema变更了,我们可以在这个链路中将schema变更同步到iceberg。这样,用户不需要去关心整个链路的变动,直接去取下游的iceberg表就可以了。

  • 第四,使用iceberg来替换一些专有型的数据库,价格更便宜。比如我们之前有些链路使用的是kudu,我们在某些适应场景用iceberg来替换它,成本更低。

我们在数据入湖之中也遇到了一些问题,主要是像mysql这类以自增id为主键的数据入湖。我们给用户提供两种数据分区的方式,种是Bucket分区,第二种是Truncate分区。在以自增id为主键的场景我们更推荐用户使用Truncate分区而不是Bucket分区。

Bucket分区示例如下,即分桶的形式,比如有4个桶,当自增id来了之后,相对来说可以比较均匀的分布。然而这样也就带来一个问题,因为merge on read的性能比较差,需要进行异步的compaction,可能就需要对所有的都进行compaction。另外在当前这个iceberg版本当中,不停止上游作业的情况下v2表的bucket分区是不可以修改的。这样随着数据量的增长,由于我们的分区不能变,可能会导致分区数据量变大,查询性能就会越来越差。

而在truncate分区下,我们可以看到,它的分区是可以扩展的,而且由于基于自增id,数据写入和compaction只会集中在某几个分区。所以在这种场景下我们更建议用户使用truncation分区。

以下是我们数据入湖的一个简单产品化的页面,有一个schema的对应关系,左边是mysql,右边是我们的iceberg表。在这个示例中,用户没有选择使用自增主键,而是选择了一个自己的业务id(order_id)来做key。

第二个iceberg常用的场景就是日志入湖。选用iceberg优于之前的hive的原因在于以下几点:

  • 是隐式分区避免了在凌晨的时候出现数据漂移问题。

  • 第二点是隐式分区带来的特性,延迟的数据我们可以将它落入到正确的分区,这点是用户非常关心的,在小米电视设备上的一些打点上报的数据的延迟情况是非常严重的。如果用户选择以前那种方式而不是数据湖,可能会导致近几个分区错误,可能就需要去搞另外一个表把这些数据重新存一遍,那么下游也需要再重新算一遍。隐式分区就可以帮助解决这个延时数据正确的问题。用户只需要对下游重新算一遍就可以了。

  • 第三点是flink的exactly once以及iceberg事务性能够保证数据的不丢不重。

  • 第四点是在我们的日志场景下也可以支持schema同步变更。

这是我们内部的一个日志入湖场景下的产品化。我们在后一行加了个时间分区的映射,用户可以选择Talos(Kafka)的一个记录时间,也可以选择以一个实际的业务时间来做时间分区。这样用户在数据入湖时就有更多选择。

刚才我们提到了compaction,为了维护上层的作业正常,我们在后台有这样三个服务,这三个服务都是以spark的作业形式来运行:

  • 个是Compaction服务,用于合并小文件。对于v2表来说,除了合并小文件,也可以对数据的delete文件进行提前merge。

  • 第二个是Expire snapshots服务,用于处理过期的snapshots。如果snapshots一直不清理,那么元数据的文件就会越来越多,这会导致膨胀问题。因此需要服务定期做一些清理。

  • 第三个是Orphan Files clean服务,由于一些事务的失败,或者一些快照的过期,导致文件在元数据文件中已经不再引用了,需要定期清理掉。

以上两个产品基本上可以覆盖数据集成的所有场景,无论是cdc数据还是日志数据都可以灌到iceberg上。下一步就是想让用户去做一下技术架构的迭代,推动从Hive升级到Iceberg。在这一块其实也遇到了很多问题,主要问题是用户对这个新技术的接纳性并不是很高。

后来我们调研了Parquet+ZSTD的技术方式。除了iceberg本身的优点,Parquet+ZSTD的方式还可以节约成本,这个是用户比较关心的优点。如下可以看到当我们切换到Parquet + ZSTD之后,TEXT数据可以压缩节约80%,因为TEXT数据本身是没有压缩的,因此这个效果比较好。像一些通用的SNAPPY+Parquet,也可以节约30%的存储。当然,我们现在选择的ZSTD级别是国内比较流行的level3级别,这个是在压缩效率和压缩时间上都比较合适的一个level。如果我们选择更高的压缩率,可能会导致压缩时间更长,这可能在用户作业中是不可接受的。所以我们也提供可以在 Compaction 阶段配置更高的压缩级别的选项,供有需要获得更高的压缩率的用户自行选择。

为了方便用户从Hive升级到Iceberg,我们也做了一个产品化。

  • 步会生成一张和hive表 schema 结构相同的iceberg表,并将历史数据拷贝过来,这里选择拷贝历史数据而不是引用原文件是因为这样可以对历史数据进行压缩,降低存储成本;

  • 第二步是对上游的写入作业做一个升级;

  • 第三步将下游作业也进行迁移。

03

基于Iceberg的流批一体的探索

介绍完当前场景后,我们更进一步基于iceberg,做了一些流批一体上的探索。

我们现在的架构是基于常规的Lambda架构,如下的这条T+1链路,在ODS层算完之后,每天凌晨使用Spark或者MR去计算。有时用户有实时的需求,比如一些实时大屏,就需要用Flink+Talos(Kafka)来搭建一条实时链路,这样用户就需要维护两条链路。在实时链路上提供时效性,在离线链路上提供准确性。在hive离线链路上,如果数据错误可以做回溯,离线入湖上也可以支持查。

但是也存在一些缺点,比如在实时链路上, Talos/Kafka目前没有办法做一些OLAP查询,由于它不能存储全量的数据,它的回溯处理能力也有限,这样我们就需要维护两套代码和两套存储。也要面临到实时离线数据不一致的情况。

下面是我们对iceberg批流一体的建设,我们会将存储层,在ODS, DWD, DWS层全部换成iceberg。这样的好处是可以在存储上实现统一,不需要Kafka和Hive两套存储。如果将离线链路引擎切换成Flink,可以在Flink上实现计算引擎的统一。

另外,我们也可以做一些回溯,在一些开放的链路中提供一些实时查询。基于iceberg的v2表我们还可以去构建一个变更流。从业务角度来看,也是一个不错的实现方式。

我们经常提到,在批流一体中为什么需要一个离线作业来修数据?除了回溯数据导致的数据问题外,还有以下几个原因:

  • 个就是Flink状态过期了,由于内存等一些原因,状态不能一直保存,Flink的状态过期了导致一直没能join上。

  • 第二个是因为一些窗口的设置或者watermark的设置导致数据延迟数据丢失。

  • 第三个就是Lookup join维表完成之后,维表又发生了一些变更。

对于离线的修正,目前一般用overwrite覆盖分区的方式,overwrite的语义是将原来的分区删除掉,然后追加进去新的数据。在spark,flink和iceberg的结合中,有一个merge Into的语法。它的语法简单如下,merge into到一个目标表,我们选择一个数据源,然后将数据源的数据merge到目标表中,需要一个on关键字指定连接词进行连接,在这里也可以指定一个目标表的分区来只对指定分区进行变更。在 merge into 语句中可以进行下面三个操作:如果我们join上(match)我们可以对目标表数据进行一个删除,或者是一个更新。如果没有match上,则可以执行插入操作。

这两种修复数据的方式有它们自身的一些特点

Overwrite是分区覆盖的方式,相对merge Into来说其自身语法简单,性能也比较好,不需要进行join操作,但缺点是使用overwrite去覆盖历史分区的时候,下游的实时作业还在跑,如果读到了这部分数据可能导致下游的实时数据出现波动。

当使用merge Into的模式写入,可以实现增量的更新,只更新变更的记录。但语法比较复杂,并且需要做join,其性能比overwrite要差一些,但它的优点就是下游只会消费到一些变更的数据,对下游的影响比较小。

下图是用Merge Into去做修正的链路。上层的hive表或者iceberg表通过merge into的语法去增量的更新,这些增量更新的数据会通过flink sql对它做一些变更更新到下一层(MySQL)。这条链路还可以做增量同步。比如将Hive的一些数据变更增量同步到Mysql。如果我们全量同步写入MySQL的话会造成MySQL的一些波动。使用merge into写到iceberg里面,然后将iceberg的变更增量同步到mysql。

因为iceberg的隐式分区的特性,会带来一些分区上的选择。比如在构建这个链路中,一般如果以天为分区的话,会有两种选择:

  • 种是使用处理时间作为分区,这样的话用户可以将实时的数据落到当天的分区,也可以离线的去修昨天的数据(overwrite或用merge into去修)。这样做的优点是实时和离线数据没有交集。

  • 另外一种是选择一些事物的时间作为分区,比如常见的一些订单的创建时间做为分区。这样,当我们在变更的场景下实时的写入数据会操作多个分区,离线修复全量数据也会操作多个分区,带来的一个问题就是实时和离线处理的数据存在一个交集,在 iceberg 中处理的数据有交集则有可能出现提交冲突导致作业失败。

Merge Into通过隔离级别处理有存在交集数据(事务)带来的问题。

在Merge Into实现上,每个引擎对不同的语法有不同的实现,它提供了两个隔离级别。

  • 个隔离级别是高的隔离级别,“可序列化隔离级别”,它也是默认的隔离级别。如果配置这个级别,在merge into事务提交的过程中,如果有其它已经提交的事务和本次事务操作相同的文件,那么这个作业就会失败掉。这种情况下用户作业可能会经常失败,我们可能需要一些其它的办法,比如我们在实时上去做一个过滤只处理当天的一个分区,把其它的历史分区交由离线去做修正来避免这些冲突。

  • 第二个是“快照隔离级别”,在快照隔离级别下,在我们merge Into提交的时候,会提交的一些冲突的事务给覆盖掉,此时如果有多个作业同时去写,它们到底是哪一个提交成功,这种情况是不可预知的。这导致我们结果其实也不准确。这两种问题也是当时我们设想过的使用这两种隔离界别都会存在一些问题。

下图是一个正式应用中使用的批流一体链路,图中红线的流程是初始化数据的阶段,可以使用 spark 或者 flink。

当初始化完成之后,就可以把streaming作业跑起来,一般来说从ODS到DWD这一层,如果只是做一些简单的数据清洗逻辑,这块并不需要去做修正,当在DWD到下一层的时候,可能需要做一些修复,这里就选用spark merge into对DWM层做一个修复,完成之后这个streaming作业把包括当天实时的以及一些修正的数据都写到下一层,比较方便。

以上就是我们对批流一体的探索。

04

未来规划

后介绍一下我们对未来的规划。

  • 对Flink CDC 2.0持续跟进。Flink CDC2.0当中对全量和增量的切换比较友好,而我们现在的实现方式是参考hypersource的方式来做切换的。

  • 优化治理compaction 服务。当前iceberg做compaction都是在后台去运行的,这样每个表(尤其是where表)都需要起一个作业,如果多了的话会有资源占用的问题。

  • 跟进iceberg和flink1.14的结合。当前我们的flink是1.12的版本,它的source设计没有那么的稳定,我们去读iceberg的时候经常会遇到一些反压,我们会在后面去跟进一下flink1.14和iceberg的结合。

05

问答环节

Q1:Iceberg数据如何把数据回滚另一个snapshot版本?

A1:Iceberg有一个语法(rollback)可以回滚到指定snapshot。

Q2:Upsert生成过多小文件导致合并时候OOM?

A2:因为生成equality file量太多,在我们实践当中也经常会遇到这种问题,其中一方面需要解决冲突问题,另一方面增加策略,条件判断等,提高Compaction调度次数(如每10分钟调度一次)。

Q3:Upsert基于分区还是基于主键来更新? 

A3:Upsert是基于主键来更新的,但是要求分区必须在主键中选择。比如我们基于id更新的话,分区只能以id来做,不管bucket分区还是truncate分区。

Q4:如果需要按月年做财务账单数据,那么flink如何读取数据?

A4:跟实际业务有关,flink 流式消费读到的都是增量数据,需要进行一些窗口的运算。

Q5:Iceberg成熟度是晚于Hudi的,因此hudi的成熟度也不会比iceberg差,那么为什么要选用iceberg做数据湖技术?

A5:我们是在去年5月份开始调研,当时iceberg的版本是0.10,hudi版本在0.9左右,还有没有正式release。之所以选择iceberg是因为iceberg要比hudi早一点支持flink。因为我们用户比较看重flink流式的场景。当时的hudi版本还没有完全和spark解耦以及支持flink。如果用户在做具体选择时候可能跟自己的业务场景来做判断,比如是一些日志场景还是变更数据的场景,然后可以参考社区的活跃度以及各个大厂的使用情况,这是当时我们的一些评判方法。

Q6:现在小米落地的iceberg表是v1还是v2?

A6:都有,如果是变更数据处理v2表比较多,日志的话v1比较多。

Q7:现在小米只有iceberg和hive,然后从hive切换到iceberg的量大概有多少比例?

A7:从hive切换到iceberg的比例不是非常高,跟历史原因有点关系,像iceberg对离线spark支持的低版本是2.4,而我们内部的spark版本有2.3,对用户的jar作业切换比较困难。因此在新场景新业务接的比较多,然后也在做产品化来推动hive到iceberg的迁移,但是过程比较漫长,需要业务更新技术栈,比如从MR/spark2.3切换到spark3.1。

Q8:在业务场景用Iceberg究竟可以解决什么问题,有哪些收益?

A8:这个也是我们跟用户分享中用户反馈的。我们讲的iceberg事务性用户并不是很关心。有几个点是用户比较关心的:我们可以解决ods层数据准确性问题;第二隐式分区可以解决延迟数据问题;第三用zstd压缩方式可以降本增效,吸引用户切换到iceberg。

Q9:大数据量下upsert性能如何?

A9:当前 upsert 写入不会成为瓶颈,但 compaction 可能会比较慢。大批量数据离线入湖还是建议先去重然后再 merge 写入;如果是流式入湖则可以增大二级分区。

Q10:数据是如何存储的,存在哪里?什么格式?

A10:目前都是存储在hdfs上,存储格式很多,有parquet,还有text格式的数据。换用 iceberg 后统一使用 parquet 来存储。

Q11:跟alluxio比较有什么差异?

A11:iceberg是上层对文件的一个表抽象,alluxio可以做文件的缓存,提高查询的性能。

Q12:如果更新过于频繁,如何解决文件数量的膨胀?

A12:1. 增大flink的checkpoint间隔,可以降低文件数量,2. 文件数量跟分区有关系,选择合适的分区也不会产生小文件问题。

Q13:Iceberg/Hudi都缺一层缓存,未来如何考虑?

A13:iceberg、hudi这两个都是文件抽象层,都是可以对alluxio做支持,社区都有在跟进

Q14:底层支持clickhouse吗?

A14:目前是不支持,不过可能会基于 iceberg 建外表。

Q15:Iceberg和olap的StarRocks有什么关系?

A15:这两个还是在场景使用方面,Iceberg更偏向于数仓中构建的pipeline的链路,StarRocks更偏向多维分析的场景,在数仓设计中更靠上的一层。

Q16:为什么在流批一体的flink链路中,在dwd层加了个spark任务?

A16:因为目前社区版本只有spark支持merge into语法,我们内部其实实现了flink的类merge into语法,另外也可以使用flink的overwrite来更新。

Q17:业务表入湖数据表结构变化是怎么实现的?(mysql schema变化到iceberg schema变化的问题)

A17:我们实现了一个定制化作业,去捕捉ddl信息,对iceberg的schema做变更,对于talos(kafka)的ddl我们捕捉不到,我们采用的是用上游消息的schema去替换iceberg的schema。

Q18:如何低延迟读取数据,数据延迟情况?

A18:分钟级别,flink构建的链路依赖flink的checkpoint配置,我们建议用户设置5分钟延迟,也可以3分钟,低不能低于1分钟。

相关文章