Presto调度task选择Worker方法

2022-02-10 00:00:00 选择 调度 可以看到 类型 源头

Presto调度task方式:

  1. public final class SystemPartitioningHandle

  2. implements ConnectorPartitioningHandle

  3. {

  4. private enum SystemPartitioning

  5. {

  6. SINGLE,

  7. FIXED,

  8. SOURCE,

  9. SCALED,

  10. COORDINATOR_ONLY,

  11. ARBITRARY

  12. }

  13. }

常见的场景主要包含SINGLE、FIXED及SOURCE类型,其中SINGLE表示后数据的汇总输出,FIXED表示中间数据的计算,如JOIN等,SOURCE类型表示与源数据打交道的类型。

以下SQL为例:

  1. select * from (select * from 1test join 2test1 on 1test.id = 2test1.123id);

其执行计划为:

如上图所示,左右两个stage调度task时都是SOURCE类型,中间为两张表JOIN,其为FIXED类型,上面的output为SINGLE类型,做后的汇总输出。

无论哪种类型,在调度task时,都要选择Worker,那其是怎么选择Worker的呢?

根据上面我们提到的Presto调度task方式,可以划分为两类:非源头fragment和源头fragment

非源头fragment

依赖SystemPartitioningHandle策略,通过当前集群可用节点以及hash partition count配置(query.initial-hash-partitions,默认为100)来共同确定该fragment需要的节点个数, 之后再调用 NodeSelector 选出Worker。逻辑为下:

  1. public NodePartitionMap getNodePartitionMap(Session session, NodeScheduler nodeScheduler)

  2. {

  3. NodeSelector nodeSelector = nodeScheduler.createNodeSelector(null);

  4. List<Node> nodes;

  5. if (partitioning == SystemPartitioning.COORDINATOR_ONLY) {

  6. nodes = ImmutableList.of(nodeSelector.selectCurrentNode());

  7. }

  8. else if (partitioning == SystemPartitioning.SINGLE) {

  9. nodes = nodeSelector.selectRandomNodes(1);

  10. }

  11. else if (partitioning == SystemPartitioning.FIXED) {

  12. nodes = nodeSelector.selectRandomNodes(getHashPartitionCount(session));

  13. }

  14. else {

  15. throw new IllegalArgumentException("Unsupported plan distribution " + partitioning);

  16. }


  17. checkCondition(!nodes.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");


  18. ImmutableMap.Builder<Integer, Node> partitionToNode = ImmutableMap.builder();

  19. for (int i = ; i < nodes.size(); i++) {

  20. Node node = nodes.get(i);

  21. partitionToNode.put(i, node);

  22. }

  23. return new NodePartitionMap(partitionToNode.build(), split -> {

  24. throw new UnsupportedOperationException("System distribution does not support source splits");

  25. });

  26. }

根据上面代码,我们可以看到SINGLE及FIXED会根据调度task的类型选择Node,终调用的是同一个函数,函数逻辑为:

  1. public List<Node> selectRandomNodes(int limit, Set<Node> excludedNodes)

  2. {

  3. return selectNodes(limit, randomizedNodes(nodeMap.get().get(), includeCoordinator, excludedNodes));

  4. }

源头fragment

源头fragment依赖于其connector id,以hive connector为例,其通过discovery服务注册上来,具体实现是在DiscoveryNodeManger#refreshNodesInternal。在真正Schedule task时,为split分配Node时,采用DynamicSplitPlacementPolicy策略调用以下接口:

  1. public static ResettableRandomizedIterator<Node> randomizedNodes(NodeMap nodeMap, boolean includeCoordinator, Set<Node> excludedNodes)

  2. {

  3. ImmutableList<Node> nodes = nodeMap.getNodesByHostAndPort().values().stream()

  4. .filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier()))

  5. .filter(node -> !excludedNodes.contains(node))

  6. .collect(toImmutableList());

  7. return new ResettableRandomizedIterator<>(nodes);

  8. }

可以看到无论是哪种选择方式,终都绕不开NodeMap,那NodeMap怎么来的,由上文代码可以看到,其需要创建nodeScheduler.createNodeSelector,代码如下:

  1. public NodeSelector createNodeSelector(ConnectorId connectorId)

  2. {

  3. // this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be

  4. // done as close to when the the split is about to be scheduled

  5. Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(() -> {

  6. ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder();

  7. ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder();

  8. ImmutableSetMultimap.Builder<NetworkLocation, Node> workersByNetworkPath = ImmutableSetMultimap.builder();


  9. Set<Node> nodes;

  10. if (connectorId != null) {

  11. nodes = nodeManager.getActiveConnectorNodes(connectorId);

  12. }

  13. else {

  14. nodes = nodeManager.getNodes(ACTIVE);

  15. }


  16. Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()

  17. .map(Node::getNodeIdentifier)

  18. .collect(toImmutableSet());


  19. for (Node node : nodes) {

  20. // 略

这里我们找到源头了,Presto的机器是由Discovery管理的,上文nodeManager即DiscoveryNodeManager封装了Discovery服务接口,Discovery维护Worker死活信息。

知道Worker怎么选择后,之后就会使用stage的不同的调度器来开始下发task和split,同样根据SystemPartitioningHandle的不同,源头分为SourcePartitionedScheduler、FixedSourcePartitionedScheduler,非源头使用FixedCountScheduler进行调度,其在调度某个stage时下发多少task到Worker以及split下发过程,我们将在后续介绍。

来源 https://mp.weixin.qq.com/s/N7zvXCj3rrdJwBVcgwVS2Q

相关文章