Spark内存模型介绍及Spark应用内存优化踩坑记录

2022-05-31 00:00:00 数据 内存 所示 占用 空间

Spark作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解Spark内存管理的基本原理,有助于更好的开发Spark应用程序和进行性能调优。同时,有效率的内存使用是Spark应用高效性能表现的关键所在,不合理的内存使用就会导致Spark性能表现的很糟糕。

在执行Spark的应用程序时,Spark集群会启动Driver和Executor两种JVM进程。
Driver程序主要负责:
(1)创建Spark上下文;
(2)提交Spark作业(Job)并将Job转化为计算任务(Task)交给Executor计算;
(3)协调各个Executor进程间任务调度。

Executor程序主要负责:
(1)在工作节点上执行具体的计算任务(Task),并角计算结果返回给Driver;
(2)为需要持久化的RDD提供存储功能。

由于Driver的内存管理比较简单,和一般的JVM程序区别不大,所以本文重点分析Executor的内存管理。所以,本文提到的内存管理都是指Executor的内存管理。

Executor内存

Executor进程作为一个JVM进程,其内存管理建立在JVM的内存管理之上,整个大致包含两种方式:堆内内存和堆外内存,大致如下图1所示:

图1.Executor内存结构

堆内内存

指的是JVM堆内存大小,在Spark应用程序启动时通过spark.executor.memory参数配置。Executor内运行的并发任务共享JVM堆内内存。堆内内存大致可以分为如图2所示以下4个部分(以统一内存管理机制为例):

图2.JVM堆内内存结构
  1. Storage内存:主要用于存储Spark的cache数据,例如RDD的cache,Broadcast变量,Unroll数据等。需要注意的是,unrolled的数据如果内存不够,会存储在driver端。
  2. Execution内存:用于存储Spark task执行过程中需要的对象,如Shuffle、Join、Sort、Aggregation等计算过程中的临时数据。
  3. User内存:分配Spark Memory剩余的内存,用户可以根据需要使用。可以存储RDD transformations需要的数据结构。
  4. Reserved内存:这部分内存是预留给系统使用,是固定不变的。在1.6.0默认为300MB(RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024),不过在2.4.4版本中已经看不到这个参数了。

堆外内存

这里Off-heap Memory从概念上可以分为两个:

Off-heap Memory(*)

对应图1的Executor JVM内的off-heap memory(*),主要用于JVM自身,字符串, NIO Buffer等开销,可以通过spark.executor.memoryOverhead参数进行配置,大小一般设为executorMemory * 0.10, with minimum of 384。

Off-heap Memory(**)

对应图1的(**): 为了进一步优化内存的使用以及提高Shuffle时的排序的效率,Spark引入了堆外(Off-heap)内存,直接在工作节点的系统内存中开辟的空间,存储经过序列化的二进制数据。Spark可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被地申请和释放,而且序列化的数据占用的空间可以被计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同如下图3所示(以统一内存管理机制为例),所有运行中的并发任务共享存储内存和执行内存。

图3.Executor堆外内存结构

统一内存管理机制

Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,如图 2 和图 3 所示,其中重要的优化在于动态占用机制,其规则如下:

  • 设定基本的Storage内存和Execution内区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
  • 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
  • Execution内存的空间被对方占用后,可让对方将占用的部分转存到硬盘如图4所示,然后"归还"借用的空间
  • Storage内存的空间被对方占用后,无法让对方"归还",多余的Storage内存被转存到硬盘如图5所示,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂

图4.Execution内存的空间被占用

图5.Storage内存的空间被占用

Spark应用内存优化

case 1

某线上Spark应用,executor memory配置如下:
spark.executor.memory = 124G
spark.executor.memoryOverhead = 10G
结果在一些大数据量情况下,内存超出报错如下图6:

图6.Spark应用OOM错误

这个问题从报错的提示就比较好理解,提高spark.yarn.executor.memoryOverhead的内存就ok了,终在我们这个应用上提高至20G成功跑过了任务。但是笔者调研发现,在有些别人Spark应用下,不管怎么提升这个值都没用。附上Stack Overflow讨论 exceeding-memory-limits。

case 2

同上某线上Spark应用(Spark版本1.5.1),运行时Storage内存如下图7所示。 经过代码走读发现,该业务实现中,对于同一份血统(Lineage)的RDD数据,每次完成一次Action计算后都会cache计算结果的RDD数据。就导致了同一份血统的RDD数据多次持续递进cache到内存的情况。由于Spark 1.6 之后才引入的统一内存管理机制,在某些数据量较大的任务下,就会导致内存溢出。 解决方法是在每一次cache计算结果的RDD数据后就手动释放(unpresis)之前同血统的RDD数据。优化后,运行时Storage内存如下图8所示。可以看到Storage内存占用降低了近45%.

图7.unpresis优化前storage内存占用

图8.unpresis优化后storage内存占用

以上为笔者总结的Spark内存模型介绍以及个人工作中的内存调优经验,由于个人水平有限,如果有错误的地方,还望读者指正。 如果总结的有点用的,点个赞呗~

参考资料

(1)www.ibm.com/developerwo…
(2)stackoverflow.com/questions/4…
(3)spark.apache.org/docs/latest…
(4)www.jianshu.com/p/10e91ace3…
(5)www.slideshare.net/databricks/…

相关文章