Hive SQL迁移Spark SQL实践

2022-04-11 00:00:00 数据 集群 执行 迁移 资源


前言



信也科技的数据团队从2018年开始,尝试使用Spark SQL替代Hive执行离线任务。在不影响生产任务的前提下,近四个多月做了集中迁移, Spark SQL任务占比提升至30%,消耗CPU和Memory资源占比不及总集群资源的1/4,平均任务耗时减少58%,任务完成时间提前超过2.5小时,核心表SLA达成率显著提升。在整个Spark SQL迁移过程中,不仅积累了两个计算引擎的差异问题和迁移流程的踩坑实践,也收获了处理经验和优化建议。




迁移背景



  • 随着业务需求迭代变化和数据量的膨胀,维护SLA遭遇挑战

Hive SQL处理任务虽然较为稳定,但是时效性在SQL逻辑已经优时就到达瓶颈,无法再进一步提升,复杂计算流程常常需要好几个小时,随着业务需求迭代变化和数据量的膨胀,Hive已经无法完全满足目前对SLA的支持。


  • 多元化促使数仓不得不进行能力扩展

过去的传统数仓只做批量作业处理,但现在的大数据数仓已是一个很宽泛的概念,除了批处理,还需要处理准实时数据、实时数据、图计算、机器学习。


  • Spark SQL的发展远超HSQL的停滞不前

随着 Spark 以及其社区的不断发展,Spark 及 Spark SQL 本身技术的不断成熟,Spark在技术架构和性能上都展示出Hive无法比拟的优势。




Spark是当下离线数仓的佳选择



Apache Spark在其作为开源项目的第10周年发布了3.x系列的个发行版。自2010年横空出世以来,Spark这个大规模数据处理而设计的快速通用的计算引擎,已用于大数据处理、数据科学、机器学习和数据分析工作负载,现形成一个高速发展应用广泛的生态系统。


Spark SQL是支持Spark应用程序多的引擎,例如在Databricks公司,研究发现超过90%的Spark API使用DataFrame、Dataset、SQL APIs和其它库,它们都被SQL优化器优化过的。这意味着即使是Python和Scala开发人员也会通过Spark SQL引擎完成他们的大部分工作。在Spark 3.0版本中,所有修复的46%是贡献给了Spark SQL,同时在性能和ANSI标准兼容性上得到改进。在TPC-DS 30TB基准测试中,Spark 3.0 性能大约是Spark 2.4的2倍。



社区贡献很快将Spark扩展到了不同的领域,提供了流、Python和SQL等新功能,这些模式现在构成了Spark的一些主要用例。Apache Spark3.0通过显著改进对SQL和Python的支持(目前Spark使用广泛的两种语言)以及对Spark其余部分的性能和可操作性的优化,延续了这一趋势。




迁移过程



一、方案制定


在迁移Spark SQL开始之前,集群的情况是现有生产环境部署复杂,各个子系统关联繁琐,平台没有足够的物理资源和人力资源搭建另一套Hadoop集群以支持迁移工作,当然维护和时间的成本成倍增加,对不同的团队来说都是不可接受的。

经过讨论后,团队决定在同一个Hadoop集群分别使用Hive和Spark两个执行引擎,结果表和数据分库隔离。尽管对现行生产任务有资源抢占的风险,但这个解决办法小而美,不失为四两拨千斤的佳方案。



为了使迁移过程和结果不会有太大偏差,团队也制定了工作流程的要求和目标:

  • 测试任务需要双跑,相同的SQL在通过两个引擎计算之后应该得到相同的结果,保证数据的隔离和一致性。

  • 模拟早上集群生产环境的场景时,对现有真实任务的影响控制到低。

  • 对比HSQL和Spark SQL执行任务的资源差异,包括执行时间,CPU和Memory的使用,保证迁移获益。


二、Spark SQL工作内容


早期Spark SQL任务通过Spark-submit提交的方式启动任务,每次启动都是一个独立的spark application,所以每次要重新启动申请资源,相对偏重,会有一些时间损耗。后来,平台团队建议并搭建了另一个工具Spark Thrift Server,用于访问hive的数据。Spark Thrift Server 方式由一个 Driver 来管理用于执行任务的executor 池,并发的执行不同的 SQL 任务可以让YARN 资源利用率更高。无论启动多少个客户端(beeline),统一连在一个Thrift Server上,它都是一个spark application,后面不用在重新申请资源。用Thrift Server,在UI中能直接看到SQL的执行计划,方便优化。



SQL改写及准备工作:

  • 在测试库创建与目标表完全一致的测试表;

  • 复制原SQL,目标表的库名修改为测试库名;

  • 复制Shell调度脚本,Hive命令改为Spark命令;

  • 复制调度任务,修改Shell调度脚本,SQL脚本指向测试文件;

  • 在调度平台创建测试项目,部署双跑调度任务。


SQL双跑:

  • 手动执行双跑任务,任务成功后,配置时间开启双跑;

  • SQL任务双跑期间,检查目标表和测试表的数据一致性,观察两边运行时间和资源使用情况;

  • 双跑结束后,若无数据、性能或者其它问题,即可切换Spark SQL;

  • 抽取平台元数据,统计对比两个计算引擎所消耗的时间、core和memory资源。


三、挑战及应对


小文件问题

平台在集群配置了小文件合并参数,很好的缓解了小文件问题给带来的压力,但问题并没有完全根除,清理小文件的工作依旧定期执行。但是迁移Spark SQL过程中,发现小文件问题被放大了,默认的spark.sql.shuffle.partitions参数配置使得所有目标表的文件个数都成了200.
等到内部patch实现在Spark SQL中引入小文件合并功能之前,我们在SQL端寻找到了一个解决方案,就是在DML后加上distribute by来划分文件个数,既不影响中间过程,同时又支持HSQL和Spark SQL的语法,开发人员都开始有意识去关注小文件问题。


未升级的Spark ExternalShuffleService
Spark 的 Executor 节点负责数据的计算,还管理并读取shuffle过程中产生的数据。如果 一个 Executor 节点挂掉了,那么运行中的任务因为无法找到节点或无法处理 shuffle 的数据而失败。为了解耦数据计算和数据读取服务,Spark 支持单独的服务来处理读取请求,这个单独的服务叫做 ExternalShuffleService。
目前ExternalShuffleService是Spark 2的版本,所有超大任务无法在Spark SQL 3版本上运行,Executor节点挂掉后各种shuffle问题层出不穷,它们却依然稳稳跑在Spark SQL 2的版本上。待到ExternalShuffleService升级后,Spark版本终将统一。




迁移成果




经过4个多月的努力,Spark SQL任务平均耗时减少58%,任务完成时间提前超过2.5小时,用时间换空间的收益效果显著。


我们对整个迁移工作流程进行复盘,总结了以下Spark SQL运用得到的大成果:

  • 提升时效以便提供更好的数据服务。

  • 节省更多Hadoop平台集群资源。

  • 技术革新以支持业务场景多元化。

  • 丰富数据中台技术栈,以提升团队核心竞争力。




●未来计划●



尽管迁移工作取得一定的成绩,但流程中也存在问题有待未来去解决:

  • 复用SQL任务调度系统,Hive和Spark资源未隔离,造成一定的资源抢占问题。

  • 现行External Shuffle Service版本未升级至3.0版本,部分大任务容易因为Executor节点挂掉而导致任务失败。


团队计划Hadoop集群资源继续向Spark SQL的方向倾斜,任务占比达到70%,迁移对象还包括shell和python中所有使用hive执行的SQL任务,Spark终将成为离线SQL任务的主流引擎。其次,优化升级Spark External Shuffle Service,与Spark SQL版本保持一致,也是迁移过程中必不可少的一环。

相关文章