Citus 三种扫描方式的一些实现细节

2022-04-13 00:00:00 查询 执行 调用 获取 循环

这一部分是CustomScan的三种实现。

在pg shard中,我们可以看到从分片查询的数据,会汇总到临时表,然后提供到上层查询,在citus里面,这里进行了优化,针对adaptive(OLTP)和TaskTracker(OLAP),insert select分别做了不同的处理,并非仅仅依赖临时表。

首先说AdaptiveExecutorCustomScanMethods

这个对象终指向AdaptiveExecutorCustomExecMethods,其中CitusExecScan就代表了Adaptive的扫描办法

AdaptiveExecutor 实际的执行器

调用ExecuteSubPlans执行下属执行计划
CreateDistributedExecution创建执行器
StartDistributedExecution 启动执行器
    创建2PC相关的锁,以及外键表的检查加锁
对于本地查询 执行RunLocalExecution,终调用ExecuteLocalTaskPlan执行
    CreateQueryDesc 创建查询
    ExecutorStart 开始查询
    ExecutorRun 执行查询
    ExecutorFinish结束查询
    ExecutorEnd 关闭查询
    FreeQueryDesc 释放资源
对于需要顺序查询的任务,执行SequentialRunDistributedExecution
    对task逐个循环调用RunDistributedExecution
对于可以并行额任务,执行RunDistributedExecution
    这里通过异步IO监听处理WaitEventSetWait
    每个task的event会绑定到WaitEventSetWait execution->waitEventSet
    每次循环中,会再次循环检查所有wait状态来处理遇到的错误:postmaster故障,中断等
    ConnectionStateMachine连接状态机负责连接的处理过程
    TransactionStateMachine事务状态机负责事务的处理
        REMOTE_TRANS_STARTED 执行StartPlacementExecutionOnSession发送SQL
        REMOTE_TRANS_SENT_COMMAND 执行ReceiveResults接收结果集
对于非只读执行,这里也负责记录影响行总数executorState->es_processed += execution->rowsProcessed
FinishDistributedExecution结束并清理数据结构
如果有sort需求,并且是returing关键字指定的返回,这里调用SortTupleStore排序

CitusExecScan会直接调用ReturnTupleFromTuplestore获取到存储的结果集(可以类比临时表)

与之对应的,是TaskTrackerExecScan,以及CoordinatorInsertSelectExecScan。

TaskTrackerExecScan则不一样,对于未完成(!finishedRemoteScan)的扫描,会从分布式执行计划获取到job与对应的查询语句,并且,这里会检查确认没有复合子查询和CTE查询(tasktracker模式下不支持这两种查询)

随后对相关表加锁LockPartitionsInRelationList,创建对应文件夹PrepareMasterJobDirectory,终执行MultiTaskTrackerExecute并缓存(LoadTuplesIntoTupleStore)入执行结果集合scanState。

MultiTaskTrackerExecute

首先通过hash表分配连接TrackerHash
    taskTrackerHash 执行查询
    transmitTrackerHash 获取结果文件
随后while true循环,知道全部成功,或者遇到失败
    ManageTaskExecution 调度任务执行到TaskExecStatus
    ManageTransmitExecution获取执行结果到TransmitExecStatus
    失败的情况直接跳出
TrackerCleanupResources 清理掉无用资源
处理期间产生的报错信息

CoordinatorInsertSelectExecScan

LockPartitionRelations 锁表
这里insert into select,进行hash关系重排,保证每个shard查询出来的语句直接对应到目标shard,避免额外的消耗
    函数ExecuteSelectIntoColocatedIntermediateResults生成对应结果
        BuildColumnNameListFromTargetList 获取列对应关系
        PartitionColumnIndexFromColumnList 获取分区列对应关系
        CreateCitusCopyDestReceiver 设置目标表的接收者
        ExecuteQueryIntoDestReceiver 实际执行

相关文章