实时数据湖 Flink Hudi 实践探索
分享嘉宾:陈玉兆 阿里云 技术专家
编辑整理:徐纯根 天翼云科技公司
出品平台:DataFunTalk
导读:首先做个自我介绍,我目前在阿里云云计算平台,从事研究 Flink 和 Hudi 结合方向的相关工作。目前,Flink + Hudi 的方案推广大概已经有了一年半的时间,在国内流行度也已比较高,主流的公司也会尝试去迭代他们的数仓方案。所以,今天我介绍的主题是 Flink 和 Hudi 在数据湖 Streaming 方向的一些探索和实践,将会围绕以下四点展开:
01
Apache Hudi背景介绍
首先和大家分享下数据湖发展的历史背景,以及Hudi的基本特性。在我个人观点看来,传统的数仓方案(如 Hive)其实本身也是数据湖,而且我会把Hudi、Iceberg、Delta Lake 都看成是数仓下一代新的解决方案,而不仅仅只是一种湖格式。那为什么近一年来会有数据湖这一新的数仓形态的诞生?伴随着目前云存储(尤其是对象存储)逐步成熟的大背景,数据湖的解决方案也会逐步往云原生靠近。如图一所示,湖格式会适配云厂商的对象存储,做云厂商多云和云厂商用例,同时适配比较流行的大数据计算框架(如Spark、Flink),以及查询端的 Presto、trino 以及传统 Hive 引擎,因此诞生了这样一套新的数仓解决方案。
由上可知,Hudi 作为下一代的数仓解决方案,借助上下游的计算和查询引擎,实现替代传统 Hive 离线数仓的一套新方案,其核心特色整体可以总结为以下四点:
方面,上游支持多种数据源格式。比如传统数据库的 change log 日志、消息队列 log 等传输方式,都会在 source 端会有非常丰富的支持。 第二方面,下游查询端也同样支持多种查询引擎。像主流的 OLAP 引擎 Presto、国内比较火的 Starrocks、云厂商的 amazon redshift、数据分析产品 impala,都会对接到这样一套数仓架构里面。
Hudi 对事务的支持程度,会比原来 Hive 数仓的要求更高,更丰富。其中核心特点是支持在文件存储布局上做更新。在传统基于 Hive 的 T + 1 更新方案中,数据重复度会比较高,只能实现天级别的数据新鲜度。并且伴随着业务需求越来越复杂,实时性要求越来越高,对数仓存储体系提出了更高的要求,对端到端的数据新鲜度要求做到分钟级或者是秒级。其次是更新效率要求提高,不要每次都去 overwrite 整张表或者整个 partition 去更新,而是能够到文件粒度的局部更新来提升存储和计算效率。Hudi很好地满足了这些需求,因此,对ACID语义的增强是这套数仓架构的第二大特点。
在我看来,第三个亮点是在ACID语义基础上衍生出来的增量处理,尤其Hudi提出的TimeTravel概念,或者直接对接Flink,Spark Streaming等流式处理引擎的方式,不管是近实时还是常驻的Streaming服务,本质都是一种流式消费,都可以理解为一种增量ETL处理。相对于传统batch调度,在计算上会更加高效,尤其像Flink这种有状态的计算框架,会复用之前的计算结果,直接实现端到端的全链路增量处理。其次,在数据新鲜度上有一个数量级的提升,从“天级别”提升到“分钟级别”。 比方说,国内目前有些实践用户会尝试使用Flink计算框架做湖表的Streaming消费,直接通过一套增量ETL链路去分析从源端注入过来的数据,构建传统数仓的分层。还有一点,很多小伙伴会好奇TimeTravel这种incremental的查询设计,查询两个快照之间的增量数据,有什么用途?如果你是批形式调度的查询,主流场景是ADS端到下游的同步,比如说将数仓的生产结果同步到其他库表(如ES,Mysql)可通过这种TimeTravel定期做这种批量同步,当你对ADS的同步时间度要求没这么高,就可以用这种幂等TimeTravel的查询方式比较高效地同步到其他下游端。以上三点,是相对于主流Hive架构地三个核心区别,也是目前国内外湖仓项目正在努力的方向。
再补充一点,在Hudi里面,会尽量优化文件布局,将小文件管理这种数据治理的方案做到框架内部,实现智能化调度。这是Hudi区别于其他数仓方案如Delta Lake,IceBerg的核心特点。1. Hudi写入pipeline(多算子组成的微服务架构)从图二中可以看出,Hudi写入pipeline是一个Serverless的微服务架构,核心是在整个pipeline的服务起来之后,不管是Flink,还是Spark Streaming,整套服务可以对表本身能达到自治理的状态。所以,不光光考虑数据高效写入,同时还需要考虑写入过程中的文件管理,尽量避免产生太多小文件从而优化查询端的效率。通过定期的文件合并,文件清理,避免出现小文件数量爆炸式增长的情况出现。另一方面,是ACID事务性,尤其是完成一个待更新的ACID事务,需考虑多方面因素。当单job或者单节点要fail over时,Hudi可以保证快速找到之前写入的错误数据,并且实现rollback回滚。所以,Hudi的事务层支持是目前三个湖存储里面做的完善,高效的。以copy on write的具体实现为例,会将上游的SQL原生数据结构转换成Hudi的数据结构,为了支持并发写入,我们会对每个shuffle后的数据分bucket。个是新增数据,会尽量写入到当前已存在的比较小的bucket里面。同时,为了避免生成小文件,也会尽量保证每个bucket的大小和预期大小相同。第二点是更新数据,Hudi设计了key主键,每个key的消息都维护在一个bucket内部,每次更新都会写入到之前的bucket里面。而IceBerg,Delta Lake就只管写入,不会去管文件布局,因此他们会把查询端的一些合并和清理做得很重,所以查询效率会比较低。相比之下,Hudi复杂的写入过程和bucket策略就是在权衡和考虑读写效率。这里所说的bucket概念,有点类似Snowflake里的micro partition概念,会在传统的partition分区下面再细化,以文件粒度来维护某个range下消息的生命周期。以更细粒度去维护生命周期,可有效提升数据更新和查询效率。第二个算子之后,数据根据每个bucket做好分区,我们会按照bucket ID做一遍shuffle,交给write task去写入。为何要按照bucket ID重新shuffle?主要是为了维护两个write task不能同时并发修改一个bucket的更新语义,否则容易造成更新冲突。所以,从整体上看,这三个算子可以高效保证并发写入、更新。可以比较明显看到,第二个算子的并发度其实决定了整个更新的并发度,决定当前能够同时更新和写入的bucket数量,而后面的算子可以自由独立地扩展。从实践经验推荐第二个和第三个算子的并发设置一样,当吞吐量不是很高的时候,一个bucket交给一个write task去写,吞吐量比较高的时候,可能一个bucket的write task可能会分多个,可以调整到1:2的比例。后台还会启动clean commits的清理任务。数据commit操作发生在coordinator组件内,保证每个write task的commit大概对齐了checkpoint之后,数据才会flush出去,并且有一部分元数据信息,会统一提交给coordinator,coordinator收集到统计信息之后,会去结合checkpoint完成的事件做一次提交,真实的提交是在coordinator内。当coordinator完成提交之后,Hudi表会发起一个新的事务,只有当write task看到这个新的事务,才能够发起新事务的写入动作。所以中间存在一个异步等待的过程,类似于一个小型的状态机。而Flink的快照所保证的语义其实是一个best effort语义,一旦收到某个checkpoint的成功事件,就标志前面的状态都是成功的,但中间可能存在checkpoint被abort情况。因为Hudi需要保证每个写入的完整性和Exactly once语义,就需要考量中间的写入不能越界,比如说checkpoint的事件数据不能写入下个checkpoint,这样Exactly once语义就没办法保证。在0.11版本会尝试做一些优化,比方说checkpoint被abort之后的状态能否复用。里面涉及一个状态的切换,相对会比较复杂。不像Spark Streaming每次都是微批的抽象,每次先发起一个任务,天然保证了exactly once,容错语义交给框架。Flink怎样把这个异步算法和很强的exactly once语义结合在一起,是这套架构的一个难点所在。
图三:Hudi小文件策略
接下来,我们仔细看看文件写入的第二个算子bucket assign的具体决策。即新消息如何去选择放到哪个bucket,如图三所示,分两种情况介绍。首先,左侧框图中有三个bucket,蓝色代表当前已经存储文件的大小,如果是insert数据,策略是每次选择当前剩余空间多的bucket写入。为何不考虑选择剩余空间少的bucket呢?因为需要考虑到COW的写放大问题,效率比较低。更新数据时,先找到维护当前key的bucket,然后写入。这样并不会造成文件大小的无限增长,因为每个record记录更新前后的大小基本近似,文件大小不会有明显的变化。影响文件大小的主要是insert数据,文件大小会设置阈值,维持在120M左右。图中右侧框图是一个比较极端的情况,两个bucket只剩下很小的写入空间,考虑到写放大影响,会重新创建一个新的bucket重新写入。为了提高并发写的吞吐量,会给每个bucket assign task分配一套独立的bucket管理策略,并利用Hash算法把bucket ID以固定的规则hash到每个bucket assign task 下面,做到了并发决策。因此,控制bucket assign task并发度就相对控制了写入小文件数量,在写入吞吐量和小文件之间的权衡。
图四:Hudi 全量和增量读取策略
介绍完数据写入过程,再看下数据读取的流读部分。流读的全量读和增量读是如何实现的?如图四所示,Hudi中TimeLine保存每个事务提交的毫秒时间戳,每个时间戳会对应一个快照版本,会记录在元数据里面。全量读时会扫全表的文件,会把整个全表的文件扫描出来,当你没有配置内置的Metadata索引表时,会直接扫全表,把文件系统中所有的文件都找出来。如果启用了Metadata表,就会在Metadata表(KV存储)里扫描这个文件信息,以相对比较高的效率扫描全表文件,然后发给下游,并且增量的部分会定期(默认60s)监听扫描TimeLine观察有没有新的commits,同步发给下游读写,每次增量的部分会基于上一次下发的时间线点位,然后一直查找到当前新的commit time。Split mornitor算子负责维护这样一套监听增量文件信息的规则,下发给真正执行读取的task。近在master版本也支持了批模式的TimeTravel查询(某个时间段的点查),以前的版本虽然支持但是会有些问题,比如增量部分meta文件如果被archive、或者被清理,数据完整性就没有保证。新版本在保证在读取效率前提下,通过实现两个快照、commit之间的批模式增量读取方式应对这两个问题,保证数据完整度。目前Flink + Hudi在国内已经是非常流行的技术架构,这边总结三个应用场景向大家介绍一下。
图五:DB数据入湖
这套架构的DB数据入湖入仓核心特色是把原来T + 1的数据新鲜度提升到分钟级别。数据新鲜度通过目前比较火的以Debezium、Maxwell为代表的CDC(change Data Capture)技术实现。以Streaming近实时的方式同步到数仓里面。在传统的Hive数仓中想保证实时是非常困难的,尤其是文件更新,湖表实时写入更新,基本不可能实现。CDC技术对数仓本身存储是有要求的,首先是更新效率得足够高,能够支持以Streaming方式写入,并且能够非常高效的更新。尤其是CDC log在更新过程还可能会乱序,如何保证这种乱序更新的ACID语义,是有很高要求的,当前能满足乱序更新的湖格式只有Hudi能做到,而且Hudi还考虑到了更新的效率问题,是目前来说比较先进的架构。图五下方的方案相比上面的方案,比较适合目前体量比较大(每天增量能达到亿级别地)、数据平台比较健全的公司,中间有一套统一的数据同步方案(汇总不同源表数据同步至消息队列),消息队列承担了数据的容错、容灾、缓存功能。同时,这套方案的扩展性也更加好。通过kafka的topic subscribe方式,可以比较灵活地分发数据。
图六:近实时OLAP
第二个场景是近实时的OLAP场景,分钟级别的端到端数据新鲜度,同时又非常开放的OLAP查询引擎可以适配。其实是对kappa架构或者是原先Streaming数仓架构的一套新解法。在没有这套架构之前,实时分析会跳过Hudi直接把数据双写到OLAP系统中,比如ClickHouse、ES、MongoDB等。当仓存储已经可以支持高效率分级别更新,能够对接OLAP引擎,那么这套架构就被大大简化,首先不用双写,一份数据就可以保证only one truth语义,避免双写带来数据完整性的问题。其次因为湖格式本身是非常开放的,在查询端引擎可以有更多选择,比如Hudi就支持Presto、trino、Spark、Starrocks、以及云厂商的redshift引擎,会有非常高的灵活度。所以,这种近实时的OLAP架构,总结就是以下两点:①统一上游存储端;②开放下游查询端。但这套架构的数据新鲜度大概是5分钟级别,如果要做到像kappa秒级别的架构的话,目前Hudi还是不太适合的,因为本身比较依赖Flink的CheckPoint机制(支持端到端的exactly once语义),所以不能做到高频次的提交。
图七:近实时ETL
第三个场景是目前比较前沿的架构,在国内也慢慢开始尝试这套架构。当数据源数据体量本身不大的时候,比方说源头过来的并不是kafka,可能源头只是一个Mysql的binlog,QPS每秒可能也就几百。那么这套架构是一个非常稳定且省事的架构,不光光是实现了这种端到端的增量处理,同时还解决中间数据入仓的需求。其实就是提供了两套抽象,首先承担了一个数仓中间存储的一个存储抽象,把数据直接以湖格式入仓;第二个是提供Queue能力,类似于Kafka这种消息队列的能力,可用Streaming方式增量消费,并且可以在其上做一些增量计算。就这一套架构直接统一原来的Lambda和kappa架构,就是kafka的存储抽象加数仓文件的存储抽象合并在一个存储抽象里面,同时没有增加过多的存储成本。大家可能后面用的都是对象存储或者是以廉价存储的形式存在的HDFS。整套架构解决了两个问题,是双写问题。在Lambda架构下,数据先写kafka,然后入仓,保证这两份数据的一致性语义比较难。而且kafka开启exactly once写入后吞吐量会下降很多,Kafka和HDFS之间的数据如何保证一致性呢?有人会理解去流读kafka,把kafka数据再起一个job同步到HDFS,这样计算资源、维护作业、同步成本都是原来的两倍。第二个解决的问题是中间层查询需求。中间层数据直接入仓,并且不是以高效率方式更新,当需要对中间层DWD表做一些join操作时,可以直接和引擎对接,而不需要去考虑说Lambda架构T+1更新效率的问题。湖格式分钟级时效性很大程度缓解了这个问题。除此之外,这套架构还有个好处,可以根据不同地应用场景选择丰富的OLAP查询引擎,直接以外表的方式接入库存储,很方便地进行OLAP分析。
图八:VVP作业图九:VVP DLF
接下来,简单讲一下目前阿里云VVP产品实时入湖的集成,主要还是入湖状态,阿里云内置Flink版本会有一个内置Hudi connector,大家可以通过FlinkSQL方式快速构建入湖任务,直接写湖表,对接Hudi CDC connector或者对接kafka CDC format,实现数据快速入湖。并且在入湖过程中,提供了商业版特性,如schema evolution。CE,CTAS语法支持schema evolution然后同时我们会主推DLF catalog元数据管理组件,DLF catalog会和EMR的DLF无缝集成,若是EMR通过spark写入,这边也可以看到,Flink入湖任务写完之后也可以管理,通过DLF组件,可以直接通过EMR查询端引擎分析Hudi格式数据。这是目前的一套推给商业化用户的技术方案,入湖通过VVP服务,分析通过EMR,后期VVP可能会集成更多能力,如流批统一,满足用户的流读需求。
图九:Hudi RoadMap
如图九所示,我将简单介绍下近期Hudi 0.12版本以及1.0版本将会做的一些feature。首先,我们会推出类似于Delta2.0的CDC Feed功能,因为目前我们支持的CDC要求输入必须是一个CDC,Hudi会用CDC格式把它存下来。CDC Feed的区别就是不用保证整体的输入是CDC格式,即使出现absert语义,或者CDC的中间数据有丢失,可以完整还原出CDC给主端。这个特性在读写吞吐量和资源上做些权衡,不会像目前这套架构处理CDC那么高效。第二点是Meta Service服务。把元数据的管理插件化,通过统一的Meta Service plugable形式,统一管理Hudi上的表和任务。第三点,我们目前还在规划做Secondary Index二级索引。因为在目前的master版本中,Flink Spark都已实现data skipping能力(在写入时,如果用户开启Meta Data表,同时开启data skipping,会额外记录每个column的统计信息),典型的是每个column会建一个Max, Min,开启一个元数据的加速,提升文件级别的查询效率。后续还会支持类似于数据库的二级索引,为某个专门的column实现类似于LSM的抽象索引,构建适用于点查场景的高效索引方案。后,后续我们还会做类似于特征工程的按列更新功能开发,类似于Clickhouse的Merge Tree抽象,独立存储某个column。因为在机器学习的特征工程中大量特征需要成千上万个字段,每次生成一个特征都需要更新一个column,所以要求单个column要具备高效的更新能力。为了适配这样的场景,Hudi也会持续去探索。Q1:直接使用Hudi存储更新,相比直接CDC到Starrocks,这两种方案哪一个更好?感觉Starrocks的QPS应该会高于Hudi的更新速度。A1:的确是这样,因为Starrocks service 会使用类似于LSM的高效主键索引,并且内存里面做partition策略,维护比较多的二级索引,元数据信息。而且,重要的一点就是在写入更新main table时会使用攒批的操作,先把多次写入汇入buffer然后进行一次flash,并且在数据的flush上也可以攒批,这也是为什么Starrocks更新效率上更高的原因。但同时,因为有了server集群,会带来两个问题,首先,会带来较高的运维成本,其次,内存模式相比于Hudi的serverless格式的开销会更大。Hudi格式在开放性上,对于Starrocks会有一定优势,不光可以对接Starrocks、还可以对接Presto、Spark等主流OLAP引擎。这就是他们的区别,两种方案的侧重点不同,还需要根据实际应用场景进行选择。如果只是做OLAP应用,Starrocks更适合。但如果想构建数仓,使用Starrocks代替Hudi的成本应该太高了。因为Hudi面向的场景主要时数仓,迭代的主要时Hive传统数仓,更有优势。Q2:流量数据入湖场景下,使用MOR(Merge on Read)表,还是COW(Copy on Write)表更合适?A2:如果流量数据体量比较大,建议使用MOR表。以目前的实测方案,QPS不超过两万,COW表还是可以支撑的。超过两万之后,比较推荐MOR online compaction方式。如果QPS更高,那可能需要把压缩任务再独立出来,这是目前能给到的一个方案。今天的分享就到这里,谢谢大家。