Flink深入浅出:Sql Gateway源码分析

2020-06-29 00:00:00 查询 创建 集群 执行 提交

Flink Sql Gateway是Flink集群的“任务网关”,支持以rest api的形式提交查询、插入、删除等任务。

官网推荐与Flink jdbc driver搭配使用,不过jdbc driver限制了很多功能,如果自己开发数据平台,好还是直接使用sql gateway。想要透彻的使用sql gateway,需要了解一下问题:

1 如何处理并发请求?多个提交怎么处理?

2 如何维护多个sql之间的关联性?

3 sql字符串如何提交到集群形成代码

4 sql-gateway都支持哪些功能?


1 整体架构


Sql Gateway其实内部说白了就是一个SessionManager会话管理器以及一个SqlGatewayEndpoint网络服务器。

其中SqlGatewayEndpoint是基于runtime.rest.RestServerEndpoint实现的Netty服务器,通过统一的架构设计,实现了多种handler,如创建会话、提交任务、任务状态查询、任务取消、拉取数据等。所有handler使用公用的SessionManager进行会话管理。SessionManager中维护了一个sessionId和Session的Map,考虑到并发问题,底层采用ConcurrentHashMap作为并发存储。

Session是具体的会话,如果使用RestAPI,则首先需要通过createSession创建session,然后只要指定为该sessionId的请求,都会在一个Session中执行。

所有的提交的任务大体可以分成两类,JobOperation和非JobOperation。JobOperation就是需要提交到Flink集群执行的任务,如select 和 insert;非JobOperation就是不需要提交到集群执行的操作,如create view, create table, drop table等。因此提交到sql gateway的所有任务,会按照sessionId分组,挨个任务执行,当为select 或者 insert时,加入到Session内部的jobOperations集合中,同样使用ConcurrentHashMap存储,目的是为了支持并发的任务提交、查询、取消。

在Session中维护了几个比较重要的组件,如:originalSessionEnv,这是sql gateway启动时配置文件中的参数,如执行引擎、并行度、端口号等(Session共用);defaultContext为启动时的环境变量和jars等信息(Session共用);重要的是ExecutionContext,它是Session独享的,会根据创建Session时提供的planner、execute type等信息,初始化内部的变量。比如在streaming模式下,executionContext内部的tEnv为StreamTableEnvironment;如果是batch模式下,则为BatchTableEnvironment。

总结来说,Session中既有所有Session共享的资源(如执行默认参数、类加载器等),也有Session独享的资源(如tEnv,catalogManager等)。


2 请求流程


sql gateway其实只是一个普通的NIO服务器,黄色部分为各个Handler,每个Handler都会持有SessionManager的引用,因此可以共同访问同一个SessionManager对象。当请求到达时,Handler会获取请求中的参数,如SessionId等,去SessionManager中查询对应的Session,从而执行提交sql、查询任务状态等工作。


3 操作详解

3.1 创建会话

会话的创建是使用sql gateway的步,必须的参数有name名称、planner执行引擎(Blink或原生的flink)、execute type(streaming或者batch)、properties(配置参数,如并发度等)。在SessionMnager中,会根据这些参数创建对应的SessionContext,并创建Session放入Map集合中。后返回对应的SessionId,方便后续使用。



3.2 关闭会话

有创建自然就有关闭,首先是主动关闭方式,即发送restapi,传入对应的SessionId,sessionManager会自动移除Map中对应的Session,对应的资源也会在垃圾回收时清理。另一种方式是被动关闭,sql gateway在启动时,自动调用了sessionManager的open方法,内部开启了一个定时线程(默认间隔为1小时),定时查询Session集合,观察是否存在过期的Session并进行移除。


3.3 任务提交

sql gateway的任务提交是重点,正常接收到请求后,会包含必要参数sessionid和statement,通过id获取到指定的session,并执行对应的runstatement方法。这个方法首先会使用SqlCommandParser解析sql,通过正则表达式判断sql的类型,比如是create view还是insert还是select。然后通过OperationFactory创建对应的操作,factory会返回具体的Operation,每种操作都有一个execute方法内部封装了具体的实现细节。比如如果是查询操作,就形成DAG图提交到集群;如果是创建表,则可以直接在本地修改对应的Catalog。如果是select或者insert操作,还需要提交到集群把对应的JobId存储起来,方便后续查询状态、拉取数据等。后返回每种执行的结果和对应的操作类型。

在Select操作中具体是如何提交任务的,稍后再详细说明。



3.4 任务状态

上面任务提交后,会把对应的任务id返回给客户端;客户端可以根据这个jobid查询对应的任务状态。查询时,Session会遍历内部的jobOperations,直接获取对应的Job描述符,内部封装了查询状态的细节。



3.5 拉取数据

在数据拉取中,batch模式和streaming模式拉取的方式是不同的。在batch模式中,内部维护了集群中表的引用,当任务执行完成,对应的结果会自动拉取到本地缓存起来,后面根据缓存的内容以及客户端希望拉取的数量分页返回。在Streaming模式中,拉取的结果是每次变更的内容,其他的细节都是类似的。


3.6 任务如何提交到集群

以SelectOperation为例,执行execute方法时,内部首先调用executeQueryInternal。先根据提交的sql创建对应的Table,即执行tEnv.sqlQuery()即可。然后根据Table创建结果表Sink,Sink为自定义的TableSink。然后把创建的Table直接插入到Sink中。通俗点说,就是在执行select查询时,sql gateway自动创建了一个临时表,存储结果;当结果拉取到本地后,再删除该表。整个插入过程终通过executor编译生成pipeline,即DAG图。通过ProgramDeployer把DAG图部署到集群。这个ProgramDeployer内部仍然是使用SPI扫描满足条件的PipelineExecutorFactory,比如我们的sql gateway内部是yarn-session模式,那么就会扫描寻找yarn-session的工厂即,YarnSessionClusterExecutorFactory。从而调用对应的execute()方法提交DAG图。这里的execute方法内部有一个yarnClient,帮助把任务发送到yarn启动。


4 总结


再来回顾前文中提到的几个关键问题:


4.1 如何处理并发请求?多个提交怎么处理?

sql gateway内部维护SessionManager,里面通过Map维护了各个Session,每个Session的任务执行是独立的。同一个Session通过ExecuteContext内部的tEnv按顺序提交。


4.2 如何维护多个sql之间的关联性

在每个Session中单独维护了tEnv,同一个session中的操作其实是在一个env中执行的。因此只要是同一个session中的任务,内部使用的tEnv就是同一个。这样就可以实现在A session中,先创建一个view,然后执行一个select,后执行一个insert。


4.3 sql字符串如何提交到集群形成代码?

Session中维护了tenv,sql会通过tenv编译生成pipeline(即DAG图),在batch模式下是Plan执行计划;在stream模式下是StreamGraph。然后Session内部会创建一个ProgramDeployer代码发布器,根据Flink中配置的target创建不同的excutor。后调用executor.execute方法提交Pipeline和config执行。


4 sql-gateway都支持哪些功能?

目前支持的功能,可以SqlCommandParser中的枚举类看到,如

show catalogs 
show current catalog 
show database 
show current database 
show tables 
show functions 
show modules 
use catalog xxx_catalog_name 
use xxx_database_name 
describe xxx_table_name 
explain xxx_select 
with x as (xxx) select * from x 
insert into xxx select * from x 
insert overwrite xxx partition(p=xx) select * from xxx 
create table xxx() with () 
drop table xxx 
create view xxx as xxx 
create database xxx 
drop database xxx 
drop view xxx 
alter database xxx 
alter table xxx 
set xxx=xxx 
reset

相关文章