阅读代码:Spark 与 Flink 中的 RPC 实现

2020-07-01 00:00:00 代码 消息 调用 接口 方法

阅读与知识的相对维度:目录zhuanlan.zhihu.com

近日常有同学来问我如何阅读代码,关于这个问题的一般性答案我特别提了一个问题并自问自答。出于提供一个实际的例子的考量,正好此前综合地阅读了 Spark 的 RPC 实现、Flink 基于 Akka 的 RPC 实现和 Actor Model 的通信模型,写成本文分享我阅读分布式计算系统 Spark 和 Flink 中的 RPC 实现的过程和思考。

简介 Actor Model 及 Akka 的问题

通常来说,阅读代码的流程是首先了解自己要阅读的代码解决了什么问题,这个问题的现有解决方案是什么,有什么优势和缺点。大致清楚了这些背景之后再在走读代码的过程中思考阅读的代码具体是怎么解决这个问题的,后专注到重点难点的代码块的理解上。也就是说,代码阅读重要的不是代码。代码只是将思考的结果转换为实际可用的软件的手段,思考的结果或者说解决问题的方法才是重要的内容。

分布式计算系统的分布式特性决定了设计过程中必然会考虑节点间的通信问题,即笼统的 RPC 需求。关于 RPC 和 RMI 及 Actor Model 具体的差别本文不做展开,主要集中在 Spark 和 Flink 的 RPC 实现来介绍 Actor Model 下的 RPC 实现。

Actor Model 的主要概念包括

  • 通信的主体 Actor
  • 通信的内容 Message
  • 单线程先到先处理的消息处理器 Mailbox

特别需要提及的是 Actor 之间的通信是通过类似于地址的 ActorRef 来引用其他的 Actor 的,同时,在实现中,需要一个支持 Actor Model 运行起来的 ActorSystem 环境。这些具体的概念和名词属于 Akka,我们会在后面看到它们如何在 Spark 和 Flink 中被一一对应。

Actor Model 一个很少被注意的特点是它的建模过程中只存在 tell 这一个通信原语,ask 等等只是构建在 tell 上层的方便的通信模式。这就导致一个问题,即 Actor Model 原生的编程模式是明显不同于传统的编程模型的。传统的编程模型中,函数调用是有返回值的,即使采用 Future 作为返回值的占位符,本质上还是有一一对应的返回值的;而在 Actor Model 中,消息一经发出就被遗忘,即所谓的 fire and forget 模式。要建立当前发出的消息和稍后收到的消息之间的 ask and answer 关系,需要额外的工作。这部分的内容可以参考 Akka 官方文档中介绍通信模式的章节,本身可以作为 Akka 佳实践的一部分,有时间我会专门写一篇文章介绍 Actor Model 下完全被颠覆的编程模型以及通过在其上模拟常见的编程模型来探索 Actor Model 的佳实践。

关于更多 Actor Model 的概念性和介绍性资料,可以参考的资料有 Akka 的官方文档和《反应式设计模式》等等。

Akka 作为目前成熟的 Actor Model 的实现之一,以及拥有容易理解的单线程 Actor 和并发通信模型,广泛地充当了 JVM 系的分布式系统的 RPC 层。Akka 近的演化有两个重点,一个是类型化(Typed)的 Akka,另一个是在拆分行为(Behavior)和状态(State)的概念。前者我们后面看到 Spark 和 Flink 的 RPC 实现时就能看到选择标准的不同,后者这里不作展开,可能会在后续讨论函数式编程的文章中再次提及。

尽管 Akka 的实现非常成熟,但是直接使用 Akka 的底层 Actor Model 的软件却不多。对于业务软件来说,Akka Model 过于底层,如果要利用它带来的好处通常会直接使用 Akka Streams 和 Akka HTTP 等上层建筑;对于其他分布式系统来说,主要有以下两个问题。

个问题是两层集群的负担。如果我们使用 Akka 作为底层 RPC 的实现,本身 Akka 会有相应的基础组件,包括 ActorSystem 或者进一步使用 Akka Cluster 的话相应的 Cluster 对象。我们的分布式系统例如 Spark 和 Flink 本身有自己的集群管理策略,在 Spark 中有 Driver 和 Worker 的概念,在 Flink 中有 JobManager 和 TaskManager 等概念。如果在处理本身系统的集群管理的同时还要兼顾底层的 Akka 集群,这样两层的集群在实际开发和运维的过程当中会带来额外的复杂性。尤其是 Akka 作为一个功能复杂的重量级框架,并且在 Typed Akka 中做出了限制公开的直接沟通两个 Actor 的能力,强制要求使用 Akka Cluster 的决定。同时处理两层集群复杂的状态机和角色与消息的转换将会是一个巨大的负担。

第二个问题是版本的负担,这也是 Spark 走向去 Akka 化的直接原因,也是 Flink 社区经常被提问的一个问题。我们知道,为了保证分布式系统的稳定性,它依赖的组件尤其是 RPC 实现这样底层模块的依赖版本会保持相当的稳定性。这样就有一个问题,Spark 和 Flink 的用户在使用它们的同时也很有可能使用 Akka,并且依赖的是另一个 Akka 的版本。这样,就会出现版本不同带来的不兼容性问题。通常来说,这一点可以通过发布一个项目专有的第三方依赖并使用 shaded 技术重定位包名来解决问题。但是由于重定位为了覆盖反射调用,是在字节码级别对全限定名和字符串的包名前缀做替换。一般来说,包名都是诸如 org.apache.spark 或者 org.apache.flink 的形式,具有性,替换起来不会有什么问题。Akka 就不一样了,它的包名是 akka.actor 等等,跟配置名称是一样的。这就导致重定位会错误改动代码中的配置名字符串导致运行时字符串失配出错。版本问题在 Lightbend 全家桶里是不存在的,例如 Play 通过接口暴露底层的 Akka 数据结构,并固定依赖到某一个版本,这样使用 Play 的人需要 Akka 的功能是只需要通过接口拿到对应的 Akka 数据结构就可以,但是这种方式并没有考虑和其他系统的版本兼容问题。

虽然上述问题可以通过定制 ClassLoader 并精心调整打包策略来绕过,或者要求用户程序使用跟系统框架兼容的 Akka 版本,但是这会导致复杂不友好的用户体验,而清楚简单的用户体验很多时候比功能更能决定一个框架的生存空间。同时,Akka 提供的很多功能,例如 Actor Model 基石的监督(Supervise)功能,对于上层提供 Failover 机制的 Spark 和 Flink 来说是多余的。前有用户体验的硬性需求,后有开发轻量化的敏捷需求,Ligetbend 系以外的成熟的分布式系统开发自己的 RPC 实现是理所当然的选择。

理解了 Spark 和 Flink 为什么要开发自己的 RPC 实现之后,我们再看到 RPC 实现具体的考量点和内容。

Spark 的 RPC 实现

Spark 开发自己的 RPC 实现以换下 Akka 的理由主要是上面提及的版本依赖问题,在社区中记录为 SPARK-5293。

阅读相关代码,首先我们要定位代码的位置。Spark 的 RPC 实现主要位于 core 模块下的 org.apache.spark.rpc 这个包下,阅读代码的过程中通过跳转到定义和查找使用点可以找到完整的脉络。结果而言,除了实际的 RPC Endpoint 实现之外,主要相关的代码还包括 common/network-common 路径下网络传输层相关的底层支持。

Spark 的 RPC 实现虽然是为了替换 Akka 而诞生的,但是它实际上可以看成一个简化版的 Akka,仍然遵循许多 Actor Model 的抽象。例如

  • RpcEndpoint 对应 Actor
  • RpcEndpointRef 对应 ActorRef
  • RpcEnv 对应 ActorSystem

RpcEndpoint 与消息处理模型

这其中从模型上来说简单的反而是 RpcEndpoint,因为所有的实现逻辑是具体实现类的事情,它其实只是一个简单的存根(Stub)。总的来说,RpcEndpoint 有以下接口

private[spark] trait RpcEndpoint {
  final def self: RpcEndpointRef = ???
  final def stop(): Unit = ???
  val rpcEnv: RpcEnv = ???

  def receive: PartialFunction[Any, Unit] = ???
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = ???

  def onError(cause: Throwable): Unit = ???
  def onConnected(remoteAddress: RpcAddress): Unit = ???
  def onDisconnected(remoteAddress: RpcAddress): Unit = ???
  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = ???

  def onStart(): Unit = ???
  def onStop(): Unit = ???
}

相关文章