动态代码生成技术在 Presto 中使用简介
在《ASM 与 Presto 动态代码生成简介》这篇文章中,我们简单介绍了 Presto 动态代码生成的原理以及 Presto 在计算表达式的地方会使用到动态代码生成技术。为了加深理解,本文将以两个例子介绍 Presto 里面动态代码生成的使用。
EmbedVersion
我们往 Presto 提交 SQL 查询以及 TaskExecutor 启动 TaskRunner 执行 Task 的时候都会使用到 EmbedVersion 类里面的 embedVersion 方法。embedVersion 方法其实就是初始化一个 Runnable 实例,比如启动 TaskRunner 的代码片段如下:
executor.execute(embedVersion.embedVersion(new TaskRunner()));
其中 TaskRunner 就是实现 Runnable 接口的。EmbedVersion 的 embedVersion 方法实现如下:
public Runnable embedVersion(Runnable runnable)
{
requireNonNull(runnable, "runnable is null");
try {
return (Runnable) runnableConstructor.invoke(runnable);
}
catch (Throwable throwable) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}
}
其中 runnableConstructor 就是使用 ASM 进行代码生成的类,实现如下:
// 这里定义了一个类,类名大概为 Presto_null__testversion____20211011_105831_1,
// 它的父类是 Object,并实现了 Runnable 接口。
ClassDefinition classDefinition = new ClassDefinition(
a(PUBLIC, FINAL),
makeClassName(baseClassName(serverConfig)),
type(Object.class),
type(Runnable.class));
// 定义了一个名为 runnable 的局部变量,类型为 Runnable
FieldDefinition field = classDefinition.declareField(a(PRIVATE), "runnable", Runnable.class);
Parameter parameter = arg("runnable", type(Runnable.class));
// 定义了这个类的构造函数,参数为 runnable,参数类型为 Runnable
MethodDefinition constructor = classDefinition.declareConstructor(a(PUBLIC), parameter);
// 构造方法里面其实就是把参数 runnable 的值赋值给局部变量 runnable
constructor.getBody()
.comment("super(runnable);")
.append(constructor.getThis())
.invokeConstructor(Object.class)
.append(constructor.getThis())
.append(parameter)
.putField(field)
.ret();
// 定义了一个名为 run 的方法,事实上就是实现 Runnable 接口里面的 run 方法
MethodDefinition run = classDefinition.declareMethod(a(PUBLIC), "run", type(void.class));
// run 里面其实就是调用局部变量 runnable 的 run 方法
run.getBody()
.comment("runnable.run();")
.append(run.getThis())
.getField(field)
.invokeInterface(Runnable.class, "run", void.class)
.ret();
// 定义这个类,并加载到 ClassLoader 中
Class<? extends Runnable> generatedClass = defineClass(classDefinition, Runnable.class, ImmutableMap.of(), getClass().getClassLoader());
this.runnableConstructor = constructorMethodHandle(generatedClass, Runnable.class);
上面是 Presto 操作 Java 字节码并动态生成了一个类,其生成的类大概如下面所示:
package com.facebook.presto.$gen;
public final class Presto_null__testversion____20211011_105831_1 implements Runnable {
private Runnable runnable;
public Presto_null__testversion____20211011_105831_1(Runnable runnable) {
this.runnable = runnable;
}
public void run() {
this.runnable.run();
}
}
看起来内容其实很简单。EmbedVersion 类算是 Presto 里面动态代码生成简单的例子了。
concat 函数实现
下面我们来看下稍微复杂的,也就是 Presto 里面内置函数的实现。Presto 的内置函数的实现很多也是用到代码生成技术,比如 map_filter、transform_keys 以及 transform_values 等。我们这里也举一个比较简单的例子,也就是 concat 函数的实现。比如下面的 SQL 查询:
select concat(o_orderstatus, o_orderpriority) from orders limit 10;
在 Presto 里面,concat 函数的实现就是通过代码生成进行的,其实现代码可以参见 com.facebook.presto.operator.scalar.ConcatFunction。Presto 接收到上面的 SQL 查询后,会在 Coordinator 端进行解析,并生成相应的 Tasks,提交给 Worker 执行。在 Worker 端,执行 Task 的时候,会调用 LocalExecutionPlanner 的 plan 方法生成 LocalExecutionPlan 其实就是本地可执行的计划,在 plan 方法里面会调用 com.facebook.presto.sql.planner.LocalExecutionPlanner.Visitor 对 Coordinator 传过来的 PlanNode 进行变量生成 PhysicalOperation。在我们的例子中,会在 com.facebook.presto.sql.planner.LocalExecutionPlanner.Visitor#visitScanFilterAndProject 里面对 concat(o_orderstatus, o_orderpriority) 进行代码生成,终调用到 com.facebook.presto.operator.scalar.ConcatFunction 的 generateConcat 方法,其就是 Presto 的 concat 函数实现逻辑,如下:
// arity 代表 Concat 函数输入参数的个数
private static Class<?> generateConcat(*ignature type, int arity)
{
checkCondition(arity <= 254, NOT_SUPPORTED, "Too many arguments for string concatenation");
// 定义动态代码生成的类名,生成的类名大概是 varchar_concat2ScalarFunction_20211011_062900_3 样子的
ClassDefinition definition = new ClassDefinition(
a(PUBLIC, FINAL),
makeClassName(type.getBase() + "_concat" + arity + "ScalarFunction"),
type(Object.class));
// 生成类的构造函数,这里是使用 private 修饰的
// Generate constructor
definition.declareDefaultConstructor(a(PRIVATE));
// Generate concat()
// 定义 concat 函数的参数,比如 arg0、arg1;类型是 Slice
List<Parameter> parameters = IntStream.range(, arity)
.mapToObj(i -> arg("arg" + i, Slice.class))
.collect(toImmutableList());
// 定义一个名为 concat 的函数,它的修饰符是 public static,
// 返回类型是 Slice,输入参数是上面定义的 arg0、arg1 等。
MethodDefinition method = definition.declareMethod(a(PUBLIC, STATIC), "concat", type(Slice.class), parameters);
Scope scope = method.getScope();
BytecodeBlock body = method.getBody();
// 定义一个名为 length 的局部变量,类型为 int
Variable length = scope.declareVariable(int.class, "length");
// length 变量初始化为0
body.append(length.set(constantInt()));
// 下面是计算 concat 函数每个参数的长度(其实就是调用 string 的 length 方法)
// 然后再把得到的字符串长度加到 length 里面,并赋值给 length
for (int i = ; i < arity; ++i) {
body.append(length.set(generateCheckedAdd(length, parameters.get(i).invoke("length", int.class))));
}
// 定义一个名为 result 的局部变量,类型为 Slice
Variable result = scope.declareVariable(Slice.class, "result");
// 调用 Slices 的 allocate 方法分配出长度为 length 空间的 Slice 对象,并赋值给 result
body.append(result.set(invokeStatic(Slices.class, "allocate", Slice.class, length)));
// 定义一个名为 position 的局部变量,类型为 int,赋值为 0
Variable position = scope.declareVariable(int.class, "position");
body.append(position.set(constantInt()));
// 下面是循环调用 result 的 setBytes 方法,并分别把 arg0、arg1 里面的内容放到 result 里面去
// 后计算 arg0 或 arg1 字符串长度再加上 position 的值,结果再赋值给 position
for (int i = ; i < arity; ++i) {
body.append(result.invoke("setBytes", void.class, position, parameters.get(i)));
body.append(position.set(add(position, parameters.get(i).invoke("length", int.class))));
}
// 返回 result
body.getVariable(result)
.retObject();
// 定义生成的类,并把它加载打破 DynamicClassLoader 里面去
return defineClass(definition, Object.class, ImmutableMap.of(), new DynamicClassLoader(ConcatFunction.class.getClassLoader()));
}
private static BytecodeExpression generateCheckedAdd(BytecodeExpression x, BytecodeExpression y)
{
// 调用 ConcatFunction 类里面的 checkedAdd 静态方法
return invokeStatic(ConcatFunction.class, "checkedAdd", int.class, x, y);
}
@UsedByGeneratedCode
public static int checkedAdd(int x, int y)
{
try {
return addExact(x, y);
}
catch (ArithmeticException e) {
throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "Concatenated string is too large");
}
}
为了方便理解,我对 generateConcat 方法的实现进行了注释,应该很好理解。为了性能问题,终生成的函数会进行缓存,下一次再调用 concat 函数,只要函数签名一样,就不用再一次进行 concat 代码的生成。比如我们前面的例子是对两个字符串进行合并(函数签名为 presto.default.concat(varchar,varchar):varchar ),如果下一次还是调用这个函数就不用再进行代码生成了。但是如果下一次是对三个字符串进行合并,还是要进行一次代码生成的。
到这里,大家可能还是不太明白 Presto 代码生成到底生成了什么东西。这里我就进一步介绍一下。如果运行我们上面的 SQL 查询,Presto 生成的 concat 实现大概如下面所示:
package com.facebook.presto.$gen;
import com.facebook.presto.operator.scalar.ConcatFunction;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
public final class varchar_concat2ScalarFunction_20211011_062900_3 {
private varchar_concat2ScalarFunction_20211011_062900_3() {
}
public static Slice concat(Slice arg0, Slice arg1) {
int length = ;
int length = ConcatFunction.checkedAdd(length, arg0.length());
length = ConcatFunction.checkedAdd(length, arg1.length());
Slice result = Slices.allocate(length);
int position = ;
result.setBytes(position, arg0);
int position = position + arg0.length();
result.setBytes(position, arg1);
int var10000 = position + arg1.length();
return result;
}
}
注意,Presto 里面生成的是 Java 字节码,这里只是为了说明的方便,给出了 Java 源代码。可以看到,终生成的代码其实很好理解。Presto 里面对两个字符串进行 concat 其实就是执行上面的代码片段。
总结
本文通过两个例子简单的介绍了 Presto 里面动态代码生成的用法,通过上面两个例子,相信大家应该能够大概了解 Presto 里面的动态代码生成的技术和用法。后面有空我会介绍一下表达式这块代码生成和整个 page 处理逻辑是怎么联系到一起的,敬请关注。
来源 https://mp.weixin.qq.com/s/jfa60JhfZlqigI1mcBTtfA
相关文章