万字干货 | 一文揭秘Presto在腾讯资讯业务中的应用
1
简介
基于SQL语言,上手成本低,而且功能强大,支持reduce和lambda函数;
纯计算引擎,解耦底层存储,可快速缩扩容;
纯内存计算,速度快,提供交互式的查询体验;
通过插件的方式实现拓展功能,二次开发友好;
通过不同的连接器(connector)插件读取异构数据源,进行联邦查询。
2
业务现状
2.1 业务构成
目前,个人接触过的业务包括资讯类的腾讯看点、腾讯医典,以及医生问诊相关的腾讯云医。
2.2 痛点问题
团队技术栈差异;
同类产品较多;
架构、历史遗留;
应用场景不同。
2.3 主要工作
Presto技术运营 联邦查询引擎改造适配 Presto Oteam引擎研发 技术输出
3
技术运营
由于身处业务的数据团队中,除了参与中台的技术研发,平时也会使用Presto,并且负责SQL相关问题的答疑,既是开发者,也是使用者。大多数人对Presto的印象,仅仅停留在“都是SQL引擎”上,其实不然。Presto的SQL语言能力非常出色。如slogan所宣传的那样,SQL on Everything
:不仅能够连接各种数据源,还能满足复杂的处理逻辑。如果认为“Presto在SQL层面上做到兼容Hive就差不多了”,那就没有真正发挥出Presto的威力。
array_join(array_agg(data order by event_time asc), '/')
A/B/A/A/C -> A/B/A/C
Row
类型,使得我们几乎可以在SQL中“编程”(这里使用针对array类型的reduce函数,更通用的聚合函数为reduce_agg)。终解法如下:-- 逻辑:6/4/6/6/10/20 -> 6/4/6/10/20
-- distinct adjacent elements
SELECT reduce(
ARRAY ['6', '4', '6', '6', '10', '20'], -- 输入
CAST(
ROW(ARRAY[], '')
AS ROW(arr ARRAY(VARCHAR), prev_ele VARCHAR)
), -- 初始状态S
(S, T) -> CAST(
ROW(IF(S.prev_ele=T, S.arr, S.arr||T), T)
AS ROW(arr ARRAY(VARCHAR), prev_ele VARCHAR)
), -- lambda输入函数I
S -> array_join(S.arr, '/') -- lambda输出函数O
);
长度为N的数组。每个元素将会依次送入lambda输入函数。
初始状态。个元素和该状态作为lambda输入函数次调用的参数。
一个lambda输入函数。调用N次。它接收一个状态和一个元素,产生一个新的状态。
一个lambda输出函数。调用一次。对3中处理完的终状态做一次变换。
reduce(array(T), initialState S, inputFunction(S, T, S), outputFunction(S, R)) → R
Row
类型的变量,它可以存储多个元素。个是去重数组arr,第二个是上一个元素的值prev_ele。lambda输入函数每次接收到一个新的值,和prev_ele比较,相等则什么也不做,不等则将新值放入去重数组中,同时更新prev_ele。reduce是一种通用的模型,lambda则大程度地利用了SQL的现有能力,使得Presto的SQL表现力更加强大。3.2 窗口函数
RANGE:当前窗口会包含值相同的相邻行。
ROWS:当前窗口不会包含值相同的相邻行。
UNBOUND PRECEDING:排序后个元素。
UNBOUND FOLLOWING:排序后后一个元素。
N PRECEDING:排序后,当前行的前N行。
N FOLLOWING:排序后,当前行的后N行。
-- value为关心的值
-- 以index进行排序
WITH
t1 (value, index) AS
(
SELECT * FROM (VALUES ('a', 1),
('b', 2),
('c', 3),
('d', 4),
('e', 4),
('f', 5),
('g', 5),
('h', 6))
)
SELECT *,
-- 默认
array_agg(value) OVER
(ORDER BY index) res,
-- [开头, 当前值]
array_agg(value) OVER
(ORDER BY index RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) res_range_uc,
-- [开头, 当前行]
array_agg(value) OVER
(ORDER BY index ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) res_rows_uc,
-- [当前值, 末尾]
array_agg(value) OVER
(ORDER BY index RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) res_range_cu,
-- [当前行, 末尾]
array_agg(value) OVER
(ORDER BY index ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) res_rows_cu,
-- [前1个值,后1个值] 不支持
-- array_agg(value) OVER (ORDER BY index RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) res_range_11, not support
-- [前1行,后1行]
array_agg(value) OVER
(ORDER BY index ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) res_rows_11
FROM t1;
presto>
value | index | res | res_range_uc | res_rows_uc | res_range_cu | res_rows_cu | res_rows_11
-------+-------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+-------------
a | 1 | [a] | [a] | [a] | [a, b, c, d, e, f, g, h] | [a, b, c, d, e, f, g, h] | [a, b]
b | 2 | [a, b] | [a, b] | [a, b] | [b, c, d, e, f, g, h] | [b, c, d, e, f, g, h] | [a, b, c]
c | 3 | [a, b, c] | [a, b, c] | [a, b, c] | [c, d, e, f, g, h] | [c, d, e, f, g, h] | [b, c, d]
d | 4 | [a, b, c, d, e] | [a, b, c, d, e] | [a, b, c, d] | [d, e, f, g, h] | [d, e, f, g, h] | [c, d, e]
e | 4 | [a, b, c, d, e] | [a, b, c, d, e] | [a, b, c, d, e] | [d, e, f, g, h] | [e, f, g, h] | [d, e, f]
f | 5 | [a, b, c, d, e, f, g] | [a, b, c, d, e, f, g] | [a, b, c, d, e, f] | [f, g, h] | [f, g, h] | [e, f, g]
g | 5 | [a, b, c, d, e, f, g] | [a, b, c, d, e, f, g] | [a, b, c, d, e, f, g] | [f, g, h] | [g, h] | [f, g, h]
h | 6 | [a, b, c, d, e, f, g, h] | [a, b, c, d, e, f, g, h] | [a, b, c, d, e, f, g, h] | [h] | [h] | [g, h]
(8 rows)
3.3 高阶运营
SqlBase.g4
。比如以下SQL为什么可以运行?不是所有查询语句都需要以select开头:presto> (VALUES ('a', 1),('b', 2));
_col0 | _col1
-------+-------
a | 1
b | 2
(2 rows)
StatementAnalyzer
。比如窗口函数执行完成后,用标量函数做一些加工处理,必须写在整个窗口函数func2(func1() over ())
的外面,而不是func2(func1()) over ()
。--报错
array_join(array_agg(concat(col1, col2)), '/')
over (partition by user_id order by event_time)
vs
--成功
array_join(
array_agg(concat(col1, col2))
over (partition by user_id order by event_time),
'/')
3.4 语法、语义错误
4
联邦查询引擎
搭建各种ETL Pipeline,维护成本高;
数据分析速度严重拖慢。
为了适配内部的tHive,我们在MetaStore的Thrift RPC协议中植入了内部鉴权机制;
针对云上ES的网络情况,禁用了自动嗅探逻辑。
4.1 tHive连接器适配
Kerberos
机制的实现(下图),可以看到rawTransport作为参数,用来构建一个新的SaslTransport。TSaslClientTransport
的源码可以发现,这里其实是计算机网络分层思想的典型应用。在可靠传输层rawTransport的基础上,再包装了一个Sasl层。利用底层rawTransport提供的可靠传输能力,进一步提供安全策略。e.g. 某些QoS条件下,调用Sasl层的write()
,会对数据进行加密,Sasl进而调用下一层的write()
函数,将加密后的数据发送到可靠的传输通道中。它们都实现了TTtransport
接口,I/O函数如下所示:open()
close()
flush()
readAll()
write()
SecurityProvider
。总结:对于小白来说,“为Hive连接器增加一种鉴权机制”是个很难理解的技术需求。通过前文的探索,我们发现其本质是:“如何在HMS的Thrift RPC中,为SASL鉴权层增加一种自定义的安全协议。”这里的上下文比较多,需要对HMS、THrift RPC、SASL、JCA、Kerberos等概念有大概的了解,才知道需要做什么。对技术的提升还是很有帮助的。
4.2 ES连接器踩坑
Airlift
后台框架。在这个场景下,通过Bootstrap
注册的类会被生命周期管理器识别,@PostConstruct
注解(annotation)标记的函数会在类实例化后被自动调用。可以看到,一个refreshNodes()
函数被定期调用了,该函数会获取ES集群中所有的可用节点IP,并在下次将请求发送到其中一个节点。ElasticSearchClient
文件。在改造的过程中,发现已经有参数elasticsearch.ignore-publish-address
可以满足需求,但是在去年8月的时候,DB、SQL的文档里竟然没有记录这个参数,在GitHub上搜索一波发现已有issue了,目前社区已经补齐了文档。Airlift
后台框架虽然没有文档,但开发者还是要认真看。5
Oteam共建
Hive语义兼容,函数迁移
RBO/CBO执行解析器
Worker Tag能力
分析函数开发
语法/语义扩展
动态数据源支持
查询性能优化专项
Coordinator执行流程优化
bug fix……
限于篇幅,简单介绍一点:标量函数开发原理。
5.1 函数开发
使用注解框架的普通函数;
使用字节码适配的变长参数函数。
种方式需要使用Presto引擎的注解框架,官网给的例子比较简单,各种注解搭配使用的方式实际比较复杂。同时函数的数据类型需要涉及Presto引擎的Slice
、Block
等类型,有一定学习成本。第二种方式比较少见,而且不支持通过插件进行开发,只能写到presto-main
模块中,它基于Presto自带的字节码框架动态生成字节码(包com.facebook.presto.sql.gen
),是比较hack的实现,可以参考ArrayConcatFunction
。
5.2 函数注解框架
需要使用注解(annotation)标记出该函数是一个可供调用的标量函数,包括函数名、返回类型、参数类型等。 Java原生类型和Presto类型有一一对应的关系。Java的 Slice
对应Presto中的Varchar
类型,Java的Block
对应Presto中的Array
类型。(下文分别称为Java类型和SQL类型。)这些特定的Java类型逻辑上等价于 String
、Array数组
,但是API差别很大,前期有一定的上手成本。函数有两套签名。基于反射可以获取Java类型的形参、返回值类型,称为方法签名。基于 @SqlType
注解可以获取Presto引擎使用的参数、返回值类型,称为函数签名。这里做个严格的区分。可以使用 @TypeParameter
函数注解引入泛型变量。在函数体声明相关的泛型参数,供SqlType
引用。可以使用 @LiteralParameter
函数注解引入字面量变量。可以使用形参注解 @TypeParameter
、@LiteralParameter
、@FunctionDependency
、@OperatorDependency
声明一些依赖型参数,在调用函数之前,Presto会根据解析出来的元数据,自动注入参数依赖。
函数注解 | 形参注解 | |
public class ExampleNullFunction
{
@ScalarFunction("is_null", calledOnNullInput = true)
@Description("Returns TRUE if the argument is NULL")
@SqlType(Standard*.BOOLEAN)
public static boolean isNull(@SqlNullable @SqlType(Standard*.VARCHAR) Slice string)
{
return (string == null);
}
}
isNull函数体有三个注解, @ScalarFunction
定义了函数名和calledOnNullInput
属性。@Description
定义了函数的描述字段,在Presto客户端用show functions命令可以看到函数的描述信息。@SqlType
描述了函数的返回值类型。这些是函数注解。形参的SQL类型是VARCHAR,Java类型是Slice。如果Slice换成其他类型,函数调用会失败。这个是形参注解。 返回值、形参都有 @SqlType
注解,它们定义了SQL类型。在Presto引擎层面,基本都是使用SQL类型来进行解析的。
@ScalarFunction(name = "is_null", calledOnNullInput = true)
@Description("Returns TRUE if the argument is NULL")
public final class IsNullFunction
{
@TypeParameter("T")
@SqlType(Standard*.BOOLEAN)
public static boolean isNullSlice(@SqlNullable @SqlType("T") Slice value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(Standard*.BOOLEAN)
public static boolean isNullLong(@SqlNullable @SqlType("T") Long value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(Standard*.BOOLEAN)
public static boolean isNullDouble(@SqlNullable @SqlType("T") Double value)
{
return (value == null);
}
// ...and so on for each native container type
}
@TypeParameter
函数注解,引入了一个泛型变量T
,可以在形参注解中被@SqlType引用。@SqlType
注解的类型声明为T
以后,这几个函数的函数签名都是一样的。在Presto引擎看来,这几个函数拥有相同的函数签名,是一类函数。问:为什么第二个例子的 @ScalarFunction
和@Description
注解是写在类名上面而不是函数名上面?答:写在类名上,代表这个类中的所有方法的函数签名都是一样的,由一个 ParameticScalar
类进行管理。问:Slice和Java的String是什么关系,需要怎么处理转换? 答:参考其他函数的实现,可以调用 toStringUtf8()
转换成String
类型再做处理。问:哪些注解是一定要写的?哪些是可选的?哪些是在某些条件下需要同时出现的? 答:比如说 @ScalarFunction
中的calledOnNullInput
属性,当形参中有以下任意注解(@SqlNullable
,@BlockPosition
,@IsNull
)的时候,需要指定为true,默认为false。
虽然Presto文档只讲了冰山一角,但是引擎内部自带了很多函数,是非常有价值的参考资料。这里有很多细节,需要看Presto源码才能得到答案。以上只是注解的使用,具体这个自定义函数后续如何被Presto引擎解析,不关注问题也不大。即使注解写错了,大部分case也会在插件装载的时候被识别出来。推荐高阶开发者看看ParametricScalarImplementation
中标量函数的解析流程。
5.3 常用注解参考手册
元数据 | 含义 |
show functions
命令可以查看。@Constraint
函数注解,则需要满足它定义的表达式条件。对于形参注解,它引入一个依赖型参数。类型 | 含义 |
InterpretedFunctionInvoker
的空值处理逻辑。可以看到,contains
函数有多种类型,但是函数签名都是一样的。由于在函数中需要根据实际类型来调用接口读取元素,因此T的实际类型必须通过形参的方式传递进来,但是用户写SQL的时候并不用显式指定类型,因为它可以自动推导出来,这里涉及methodHandle的绑定参数,就不详细展开了。总之,虽然contains()
有四个参数,但是用户只感知后两个。
5.4 变长参数函数
parse_simple_json
函数,在ETL任务中一次调用解多个key,是比较高效的。虽然是变长参数,但是这里的变长,是相对不同用户提交的SQL语句而言的。而用户每一次提交的SQL,其实参数个数都是确定的,没有必要用变长参数,e.g. 对于一个SQL,代码中的parse_simple_json(d4, 'key1', 'key2')
,其实参数就是三个。函数声明为变长,但是实际中根据每条SQL语句转成定长参数。针对这种情况,Presto引擎并没有使用注解框架,而是采用了比较hack的方式,直接定义一个内部函数类,里面有一个形参为数组的业务函数。通过引擎自带的字节码生成模块,把它适配成一个定长参数函数。大致原理如下所示:6
技术输出
7
腾讯内部应用概览
后列出部分腾讯内部应用的Presto情况。
7.1 应用场景
TEG-大数据平台统一SQL引擎SuperSQL,Presto作为计算引擎融合的一部分,实现联邦数据访问,计算加速等功能,支持交互式数据分析场景。 PCG-欧拉中台,在数据质量监控和资产洞察以及在线数据服务的数据装载中,作为计算查询引擎。 TEG/CSIG联合-云原生数据湖计算DLC,用户使用标准SQL即可完成对象存储服务(COS)及其他云端数据设施的联合建模、分析,无服务器架构(Serverless Presto)作为底层计算引擎。 CSIG-云日志服务CLS,扩展了大量自定义SQL函数,以丰富PB级日志实时SQL分析能力。Presto支持底层存储解耦,提供不同场景日志需求,以及异构存储联合查询。 CSIG-医疗资讯与服务部,作为业务线数据服务平台联邦查询引擎,统一查询前端语言,计划打通用户状态存储的MySQL、流水日志存储的ES、用户行为数仓的Hive/ClickHouse/Iceberg等。 PCG-腾讯看点,连接部门内20余个异构数据源的联邦查询引擎,适配了腾讯内部的Hive/ES/CH/Redis等数据源。 IEG-数据中台,作为数据查询服务联邦查询,Adhoc场景执行引擎。
7.2 合作生态
8
后续计划
结合业务场景,完成引擎和相关连接器的优化改造
丰富联邦查询引擎应用层功能
数据科学引擎
强化技术输出 9
新书推荐
Matt Fuller、Manfred Moser、Martin Traverso 著 张晨 黄鹏程 傅宇 译 封面印象:Presto运行SQL,就像青蛙吃虫子一样快? 本书是由Presto创始团队推出的首本官方实战指南,内容质量是毋庸置疑的。对于初学者来说,左手官网文档,右手《Presto实战》进行入门应该是标准姿势。其行文的层次性、结构性,内容的完整性、权威性,对新手和高手来说都是一本非常好的"字典"。推荐给有兴趣的同学~
相关文章