Spark内存模型介绍及Spark应用内存优化踩坑记录
Spark作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解Spark内存管理的基本原理,有助于更好的开发Spark应用程序和进行性能调优。同时,有效率的内存使用是Spark应用高效性能表现的关键所在,不合理的内存使用就会导致Spark性能表现的很糟糕。
在执行Spark的应用程序时,Spark集群会启动Driver和Executor两种JVM进程。
Driver程序主要负责:
(1)创建Spark上下文;
(2)提交Spark作业(Job)并将Job转化为计算任务(Task)交给Executor计算;
(3)协调各个Executor进程间任务调度。
Executor程序主要负责:
(1)在工作节点上执行具体的计算任务(Task),并角计算结果返回给Driver;
(2)为需要持久化的RDD提供存储功能。
由于Driver的内存管理比较简单,和一般的JVM程序区别不大,所以本文重点分析Executor的内存管理。所以,本文提到的内存管理都是指Executor的内存管理。
Executor内存
Executor进程作为一个JVM进程,其内存管理建立在JVM的内存管理之上,整个大致包含两种方式:堆内内存和堆外内存,大致如下图1所示:
图1.Executor内存结构堆内内存
指的是JVM堆内存大小,在Spark应用程序启动时通过spark.executor.memory参数配置。Executor内运行的并发任务共享JVM堆内内存。堆内内存大致可以分为如图2所示以下4个部分(以统一内存管理机制为例):
图2.JVM堆内内存结构- Storage内存:主要用于存储Spark的cache数据,例如RDD的cache,Broadcast变量,Unroll数据等。需要注意的是,unrolled的数据如果内存不够,会存储在driver端。
- Execution内存:用于存储Spark task执行过程中需要的对象,如Shuffle、Join、Sort、Aggregation等计算过程中的临时数据。
- User内存:分配Spark Memory剩余的内存,用户可以根据需要使用。可以存储RDD transformations需要的数据结构。
- Reserved内存:这部分内存是预留给系统使用,是固定不变的。在1.6.0默认为300MB(RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024),不过在2.4.4版本中已经看不到这个参数了。
堆外内存
这里Off-heap Memory从概念上可以分为两个:
Off-heap Memory(*)
对应图1的Executor JVM内的off-heap memory(*),主要用于JVM自身,字符串, NIO Buffer等开销,可以通过spark.executor.memoryOverhead参数进行配置,大小一般设为executorMemory * 0.10, with minimum of 384。
Off-heap Memory(**)
对应图1的(**): 为了进一步优化内存的使用以及提高Shuffle时的排序的效率,Spark引入了堆外(Off-heap)内存,直接在工作节点的系统内存中开辟的空间,存储经过序列化的二进制数据。Spark可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被地申请和释放,而且序列化的数据占用的空间可以被计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同如下图3所示(以统一内存管理机制为例),所有运行中的并发任务共享存储内存和执行内存。
图3.Executor堆外内存结构统一内存管理机制
Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,如图 2 和图 3 所示,其中重要的优化在于动态占用机制,其规则如下:
- 设定基本的Storage内存和Execution内区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
- 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
- Execution内存的空间被对方占用后,可让对方将占用的部分转存到硬盘如图4所示,然后"归还"借用的空间
- Storage内存的空间被对方占用后,无法让对方"归还",多余的Storage内存被转存到硬盘如图5所示,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂
Spark应用内存优化
case 1
某线上Spark应用,executor memory配置如下:
spark.executor.memory = 124G
spark.executor.memoryOverhead = 10G
结果在一些大数据量情况下,内存超出报错如下图6:
这个问题从报错的提示就比较好理解,提高spark.yarn.executor.memoryOverhead的内存就ok了,终在我们这个应用上提高至20G成功跑过了任务。但是笔者调研发现,在有些别人Spark应用下,不管怎么提升这个值都没用。附上Stack Overflow讨论 exceeding-memory-limits。
case 2
同上某线上Spark应用(Spark版本1.5.1),运行时Storage内存如下图7所示。 经过代码走读发现,该业务实现中,对于同一份血统(Lineage)的RDD数据,每次完成一次Action计算后都会cache计算结果的RDD数据。就导致了同一份血统的RDD数据多次持续递进cache到内存的情况。由于Spark 1.6 之后才引入的统一内存管理机制,在某些数据量较大的任务下,就会导致内存溢出。 解决方法是在每一次cache计算结果的RDD数据后就手动释放(unpresis)之前同血统的RDD数据。优化后,运行时Storage内存如下图8所示。可以看到Storage内存占用降低了近45%.
图7.unpresis优化前storage内存占用 图8.unpresis优化后storage内存占用以上为笔者总结的Spark内存模型介绍以及个人工作中的内存调优经验,由于个人水平有限,如果有错误的地方,还望读者指正。 如果总结的有点用的,点个赞呗~
参考资料
(1)www.ibm.com/developerwo…
(2)stackoverflow.com/questions/4…
(3)spark.apache.org/docs/latest…
(4)www.jianshu.com/p/10e91ace3…
(5)www.slideshare.net/databricks/…
相关文章