手动模拟 Trino 客户端,详解客户端与服务端的交互报文
没错,这张图还是之前文章里的那一张
我在本地写了一小段代码,模拟客户端向 Trino 的 Coordinator 发送请求。代码主要来源于 Trino-client 中的 StatementClientV1#buildQueryResult 以及 StatementClientV1#advance()
本质就是利用 OKHttp3 向 Coordinator 发送 POST 和 GET 请求
代码的几个关键点:
- 如果要对 Coordinator 进行 debug,由于客户端本身设置的有超时时长,后续需要客户端继续发请求的时候,客户端早就超时失败了,因此可以设置 readTimeout 防止超时
- query 的内容是一个字符串,不要带分号
- 由于数据是分批获取,因此可能某几次的 response 中包含部分结果,需要将数据都存放在 List 中,后打印输出
- 如果某次的 response 中有数据返回,那么执行 getData 方法就不为空,此时就可以放置到 res 中了
import io.airlift.json.JsonCodec;
import io.trino.client.JsonResponse;
import io.trino.client.QueryResults;
import okhttp3.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static io.airlift.json.JsonCodec.jsonCodec;
public class TestTrinoTest {
private static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
public static void main(String[] args) throws URISyntaxException {
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
// 创建客户端,这里设置了 readTimeout 和 callTimeout
// 设置的目的是 debug Coordinator 的时候客户端会超时失败,导致后续无法进行
OkHttpClient client = clientBuilder
.callTimeout(1, TimeUnit.DAYS)
.readTimeout(1, TimeUnit.DAYS)
.build();
// 客户端发送 POST 请求的 Uri
URI postUri = new URI("http://localhost:8080/v1/statement");
HttpUrl url = HttpUrl.get(postUri);
// 要查询的语句,注意这里不能带上,语句不能带分号,带上会报错
// show catalogs 而不是 show catalogs;
String query = "show catalogs";
Request.Builder builder = new Request.Builder()
.url(url)
.post(RequestBody.create(MediaType.parse("text/plain; charset=utf-8"), query))
.addHeader("X-Trino-User", "trino")
.addHeader("User-Agent", "StatementClientV1/356");
// 构建了 post 请求
Request post = builder.build();
// 接受 QueryResult 的返回结果
JsonResponse<QueryResults> response;
// 执行请求将结果给 response
response = JsonResponse.execute(QUERY_RESULTS_CODEC, client, post);
System.out.println(response.getResponseBody());
// 上面是 POST 请求的过程
// 下面就是客户端发送 GET 请求的过程了
// 首先获取 nextUri
URI nextUri = response.getValue().getNextUri();
System.out.println(nextUri);
// 用于存放查询获取的数据
List<Object> res = new ArrayList<>();
// 如果 nextUri 不为空,就一直发送 GET 请求
while (nextUri != null) {
Request get = new Request.Builder()
.get()
.url(HttpUrl.get(nextUri))
.addHeader("X-Trino-User", "trino")
.addHeader("User-Agent", "StatementClientV1/356")
.build();
response = JsonResponse.execute(QUERY_RESULTS_CODEC, client, get);
nextUri = response.getValue().getNextUri();
System.out.println(response.getResponseBody());
// 如果获取到了数据,getData 就不为空,此时将数据全塞进 res 中
if (response.getValue().getData() != null) {
res.addAll((Collection<?>) response.getValue().getData());
}
System.out.println("nextUri 的值是: " + nextUri);
}
System.out.println("==========");
// 后遍历输出结果就可以了
for (int i = ; i < res.size(); i++) {
System.out.println(res.get(i));
}
}
}
相关文章