自定义Spark Partitioner提升es-hadoop Bulk效率

2022-01-06 00:00:00 索引 数据 节点 写入 希望

前言

之前写过一篇文章,如何提高ElasticSearch 索引速度。除了对ES本身的优化以外,我现在大体思路是尽量将逻辑外移到Spark上,Spark的分布式计算能力强,cpu密集型的很适合。这篇文章涉及的调整也是对SparkES 多维分析引擎设计 中提及的一个重要概念“shard to partition ,partition to shard ” 的实现。不过目前只涉及到构建索引那块。

问题描述

当你bulk数据到集群,按照ElasticSearch Bulk 源码解析所描述的:

接着通过executeBulk方法进入原来的流程。在该方法中,对bulkRequest.requests 进行了两次for循环。

次判定如果是IndexRequest就调用IndexRequest.process方法,主要是为了解析出timestamp,routing,id,parent 等字段。

第二次是为了对数据进行分拣。大致是为了形成这么一种结构:

第二次就是对提交的数据进行分拣,然后根据route/_id 等值找到每个数据所属的Shard,后将数据发送到对应Shard所在的Node节点上。

然而这导致了两个问题:

  1. ES Node之间会形成N*N个连接,消耗掉过多的bulk线程
  2. 出现了很多并不需要的网络IO

所以我们希望能够避免这种情况。

Spark Partition to ES Shard

我们希望能够将分拣的逻辑放到Spark端,保证Spark 的Partition 和ES的Shard 一一对应,并且实现特定的Partitoner 保证数据到达ES都会被对应的Shard所在的节点直接消费,而不会再被转发到其他节点。
经过我的实际测试,做了该调整后,写入QPS有两倍以上的提升

理论基础

这里的理论基础自然是es-hadoop项目。

类的调用路径关系为:

EsSpark -> 
EsRDDWriter ->
RestService ->
RestRepository ->
RestClient ->
NetworkClient ->
CommonsHttpTransport

相关文章