手动模拟 Trino 客户端,详解客户端与服务端的交互报文

2022-05-12 00:00:00 客户端 发送 请求 返回 超时

本文章的目的主要是为了搞清楚 Trino 的 Coordinator 接收到客户端发送的请求后,会返回给客户端什么响应,之前在文章里提到了返回的结果被包装在 QueryResult 中,因此本文对 QueryResult 的内容进行打印输出,看看一次查询中客户端和 Trino 的 Coordinator 的交互过程,同时也可以了解到客户端的本质。

没错,这张图还是之前文章里的那一张

我在本地写了一小段代码,模拟客户端向 Trino 的 Coordinator 发送请求。代码主要来源于 Trino-client 中的 StatementClientV1#buildQueryResult 以及 StatementClientV1#advance()

本质就是利用 OKHttp3 向 Coordinator 发送 POST 和 GET 请求

代码的几个关键点:

  1. 如果要对 Coordinator 进行 debug,由于客户端本身设置的有超时时长,后续需要客户端继续发请求的时候,客户端早就超时失败了,因此可以设置 readTimeout 防止超时
  2. query 的内容是一个字符串,不要带分号
  3. 由于数据是分批获取,因此可能某几次的 response 中包含部分结果,需要将数据都存放在 List 中,后打印输出
  4. 如果某次的 response 中有数据返回,那么执行 getData 方法就不为空,此时就可以放置到 res 中了
import io.airlift.json.JsonCodec;
import io.trino.client.JsonResponse;
import io.trino.client.QueryResults;
import okhttp3.*;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static io.airlift.json.JsonCodec.jsonCodec;

public class TestTrinoTest {
    private static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);

    public static void main(String[] args) throws URISyntaxException {

        OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
        // 创建客户端,这里设置了 readTimeout 和 callTimeout
        // 设置的目的是 debug Coordinator 的时候客户端会超时失败,导致后续无法进行
        OkHttpClient client = clientBuilder
                .callTimeout(1, TimeUnit.DAYS)
                .readTimeout(1, TimeUnit.DAYS)
                .build();
        // 客户端发送 POST 请求的 Uri
        URI postUri = new URI("http://localhost:8080/v1/statement");
        HttpUrl url = HttpUrl.get(postUri);
        // 要查询的语句,注意这里不能带上,语句不能带分号,带上会报错
        // show catalogs  而不是 show catalogs;
        String query = "show catalogs";
        Request.Builder builder = new Request.Builder()
                .url(url)
                .post(RequestBody.create(MediaType.parse("text/plain; charset=utf-8"), query))
                .addHeader("X-Trino-User", "trino")
                .addHeader("User-Agent", "StatementClientV1/356");
        // 构建了 post 请求
        Request post = builder.build();
        // 接受 QueryResult 的返回结果
        JsonResponse<QueryResults> response;
        // 执行请求将结果给 response
        response = JsonResponse.execute(QUERY_RESULTS_CODEC, client, post);
        System.out.println(response.getResponseBody());
        // 上面是 POST 请求的过程
        // 下面就是客户端发送 GET 请求的过程了
        // 首先获取 nextUri
        URI nextUri = response.getValue().getNextUri();
        System.out.println(nextUri);
        // 用于存放查询获取的数据
        List<Object> res = new ArrayList<>();
        // 如果 nextUri 不为空,就一直发送 GET 请求
        while (nextUri != null) {
            Request get = new Request.Builder()
                    .get()
                    .url(HttpUrl.get(nextUri))
                    .addHeader("X-Trino-User", "trino")
                    .addHeader("User-Agent", "StatementClientV1/356")
                    .build();
            response = JsonResponse.execute(QUERY_RESULTS_CODEC, client, get);
            nextUri = response.getValue().getNextUri();
            System.out.println(response.getResponseBody());
            // 如果获取到了数据,getData 就不为空,此时将数据全塞进 res 中
            if (response.getValue().getData() != null) {
                res.addAll((Collection<?>) response.getValue().getData());
            }
            System.out.println("nextUri 的值是: " + nextUri);
        }
        System.out.println("==========");
        // 后遍历输出结果就可以了
        for (int i = ; i < res.size(); i++) {
            System.out.println(res.get(i));
        }
    }
}

相关文章