自定义Spark Partitioner提升es-hadoop Bulk效率
前言
之前写过一篇文章,如何提高ElasticSearch 索引速度。除了对ES本身的优化以外,我现在大体思路是尽量将逻辑外移到Spark上,Spark的分布式计算能力强,cpu密集型的很适合。这篇文章涉及的调整也是对SparkES 多维分析引擎设计 中提及的一个重要概念“shard to partition ,partition to shard ” 的实现。不过目前只涉及到构建索引那块。
问题描述
当你bulk数据到集群,按照ElasticSearch Bulk 源码解析所描述的:
接着通过executeBulk方法进入原来的流程。在该方法中,对bulkRequest.requests 进行了两次for循环。
次判定如果是IndexRequest就调用IndexRequest.process方法,主要是为了解析出timestamp,routing,id,parent 等字段。
第二次是为了对数据进行分拣。大致是为了形成这么一种结构:
第二次就是对提交的数据进行分拣,然后根据route/_id 等值找到每个数据所属的Shard,后将数据发送到对应Shard所在的Node节点上。
然而这导致了两个问题:
- ES Node之间会形成N*N个连接,消耗掉过多的bulk线程
- 出现了很多并不需要的网络IO
所以我们希望能够避免这种情况。
Spark Partition to ES Shard
我们希望能够将分拣的逻辑放到Spark端,保证Spark 的Partition 和ES的Shard 一一对应,并且实现特定的Partitoner 保证数据到达ES都会被对应的Shard所在的节点直接消费,而不会再被转发到其他节点。
经过我的实际测试,做了该调整后,写入QPS有两倍以上的提升
理论基础
这里的理论基础自然是es-hadoop项目。
类的调用路径关系为:
EsSpark ->
EsRDDWriter ->
RestService ->
RestRepository ->
RestClient ->
NetworkClient ->
CommonsHttpTransport
相关文章