flink源码阅读篇—入口
前序
- 由于近接触了flink相关项目,封装flink-table模块,这部分模块应该在flink官方1.9x版本进行发布,截止目前还是beta版本,等待终的release版本发布。在开发期间,出于工作和兴趣的需求,就阅读了部分源码,阅读源码期间也是阅读了很多博客文章,发下文章写错的也比比皆是呀,哎有时也会误导人。先叙述篇总体轮廓篇。该篇总体思路是从flink任务提交开始,从本地提交代码逻辑,到服务端如果接收任务,后运行的不同分支逻辑。了解这部分逻辑,需要一些基础知识,包括yarn, netty基本的了解。关于yarn和netty介绍可以参考
flink任务提交方式
- flink提交方式和spark类似,比spark还略微复杂些。大体分这么几类 1、单机本地体检,2、多机集群提交,3、yarn-session提交,4、yarn-cluster per-job提交、5、还包括mesos和docker提交(这俩个略叙)。 生产环境中用第四种比就多,每个任务作为一个yarn application提交到集群,申请的资源和其他任务是隔离的,其他方式相对这个都略显简单。下面主要介绍第四种Per-Job-Cluster。
- 在看代码之前先对flink组件有个大概的初步认知:
- 1、Dispatcher(Application Master)提供REST接口来接收client的application提交,它负责启动JM和提交application,同时运行Web UI。
- 2、ResourceManager:一般是Yarn,当TM有空闲的slot就会告诉JM,没有足够的slot也会启动新的TM。kill掉长时间空闲的TM。
- 3、JobManager :接受application,包含StreamGraph(DAG)、JobGraph(logical dataflow graph,已经进过优化,如task chain)和JAR,将JobGraph转化为ExecutionGraph(physical dataflow graph,并行化),包含可以并发执行的tasks。其他工作类似Spark driver,如向RM申请资源、schedule tasks、保存作业的元数据,如checkpoints。如今JM可分为JobMaster和ResourceManager(和下面的不同),分别负责任务和资源,在Session模式下启动多个job就会有多个JobMaster。
- 4、TaskManager:类似Spark的executor,会跑多个线程的task、数据缓存与交换。
代码分析:
- Per-Job-Cluster模式也分为本地和远端。
- 本地模式:
本地流程
- 与Session-Cluster模式类似,入口也为CliFrontend#main
- 解析处理参数
- 根据用户jar、main、程序参数、savepoint信息生成PackagedProgram
- 根据PackagedProgram创建JobGraph(对于非分离模式还是和Session模式一样,模式Session-Cluster)
- 获取集群资源信息
- 部署集群YarnClusterDesriptor#deployJobCluster -> AbstractYarnClusterDescriptor#deployInternal;
- 进行资源校验(如内存大小、vcore大小、队列)
- 通过YarnClient创建Application
- 再次校验资源
- AbstractYarnClusterDescriptor#startAppMaster启动AppMaster
- 初始化文件系统(HDFS)
- 将log4j、logback、flink-conf.yaml、jar包上传至HDFS
- 构造AppMaster的Container(确定Container进程的入口类YarnSessionClusterEntrypoint),构造相应的Env
- YarnClient向Yarn提交Container申请
- 跟踪ApplicationReport状态(确定是否启动成功,可能会由于资源不够,一直等待)
- 启动成功后将对应的ip和port写入flinkConfiguration中
- 创建与将集群交互的ClusterClient
- 根据flink-conf的HA配置创建对应的服务(如StandaloneHaServices、ZooKeeperHaServices等)
- 创建基于Netty的RestClient;
- 创建/rest_server_lock、/dispatcher_lock节点(以ZK为例)
- 启动监听节点的变化(主备切换)
- 通过ClusterClient获取到appId信息并写入本地临时文件AbstractYarnClusterDescriptor#startAppMaster中与Session-Cluster有一个显著不同的就是其会将任务的JobGraph上传至Hdfs供后续服务端使用
经过上述步骤,客户端提交任务过程就完成了,主要涉及到文件(JobGraph和jar包)的上传。
远端流程
- 远端宿主在Container中的集群入口为YarnJobClusterEntrypoint#main
- ClusterEntrypoint#runClusterEntrypoint -> ClusterEntrypoint#startCluster启动集群
- 创建JobDispatcherResourceManagerComponentFactory(用于创建JobDispatcherResourceManagerComponent)
- 创建ResourceManager(YarnResourceManager)、Dispatcher(MiniDispatcher),其中在创建MiniDispatcher时会从之前的JobGraph文件中读取出JobGraph,并启动进行ZK选举
- 当为主时会调用Dispatcher#grantLeadership方法
- Dispatcher#recoverJobs恢复任务,获取JobGraph
- Dispatcher#tryAcceptLeadershipAndRunJobs确认获取主并开始运行任务
- Dispatcher#runJob开始运行任务(创建JobManagerRunner并启动进行ZK选举),
- 创建JobManagerRunner(处理leader选举)
- 创建JobMaster(实际执行任务入口,包含在JobManagerRunner)
- 启动JobManagerRunner(会进行leader选举,ZK目录为leader/${jobId}/job_manager_lock)
- 当为主时会调用JobManagerRunner#grantLeadership方法
- 启动JobMaster
- 将任务运行状态信息写入ZK(/${AppID}/running_job_registry/${jobId})
- 启动JobMaster的Endpoint
- 开始调度任务JobMaster#startJobExecution
接下来就进行任务具体调度(构造ExecutionGraph、申请Slot等)流程。
Flink On Yarn With dispatcher
主要结合一下yarn的执行流程,介绍一下任务调转执行流程。
- Dispatcher组件负责接收作业提交,持久化它们,生成JobManagers以执行作业并在Master故障时恢复它们。此外,它知道Flink会话群集的状态。
- 某些集群管理器需要一个集中的作业生成和监视实例
- 它包含独立JobManager的角色,等待提交作业
- 当开始一个新的Flink yarn 会话时,客户端首先检查所请求的资源(containers和内存)是否可用。如果资源够用,之后,上传一个jar包,包含Flink和HDFS的配置。
- 客户端向yarn resource manager发送请求,申请一个yarn container去启动ApplicationMaster。
- yarn resource manager会在nodemanager上分配一个container,去启动ApplicationMaster
- yarn nodemanager会将配置文件和jar包下载到对应的container中,进行container容器的初始化。
- 初始化完成后,ApplicationMaster构建完成。ApplicationMaster会为TaskManagers生成新的Flink配置文件(使得TaskManagers根据配置文件去连接到JobManager),配置文件会上传到HDFS。
- ApplicationMaster开始为该Flink应用的TaskManagers分配containers,这个过程会从HDFS上下载jar和配置文件(此处的配置文件是AM修改过的,包含了JobManager的一些信息,比如说JobManager的地址)
- 一旦上面的步骤完成,Flink已经建立并准备好接受jobs。
代码详细分析
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
相关文章