Elasticsearch 集成hbase实现

2020-05-27 00:00:00 索引 数据 函数 操作 钩子

一、HBase概述

  • 是一个分布式nosql的非关系数据库.
  • 基于HDFS适合海量数据存储
  • 根据RowKey查询数据,效率高
  • 模糊查询效率非常差
  • 随意组合的多条件查询非常差

1.Hbase Observer

HBase 0.92版本后推出了Coprocessor -- 协处理器,一个工作在Master/RegionServer中的框架,能运行用户的代码,从而灵活地完成分布式数据处理的任务。

Coprocessor包含两个组件,一个是EndPoint(类似关系型数据库的存储过程),用以加快特定查询的响应,另一个就是Observer(类似关系型数据库的触发器)。Observer也分为几个类型,其中RegionObserver提供了一组表数据操作的钩子函数,覆盖了Get、Put、Scan、Delete等操作(通常有pre和post两种情况,表示在操作发生之前或发生之后),我们可以通过重载这些钩子函数,利用RegionServer实现特定的数据处理需求。

基于RegionObserver的钩子函数,我们可以覆盖Put及Delete方法来实现Hbase和ES直接的数据同步。

2. 方案目标

  • 实现对HBase高性能的范围检索
  • 保证数据的低冗余
  • 保持数据的一致性

3.方案流程


  • 数据进入HBase时,利用Observer同步进入ES索引库;
  • 客户端根据查询条件,利用ES提供的Java API对ES发起查询请求;
  • ES返回符合条件的RowKey;
  • 客户端再根据RowKey去HBase获取数据;
  • 后HBase返回结果集。

4. 通过HBase Observer同步数据到ElasticSearch

4.1.Observer解决的问题

HBase是一个分布式的存储体系,数据按照RowKey分成不同的Region,再分配给RegionServer管理。但是RegionServer只承担了存储的功能,如果Region能拥有一部分的计算能力,从而实现一个HBase框架上的MapReduce,那HBase的操作性能将进一步提升。正是为了解决这一问题,HBase 0.92版本后推出了Coprocessor -- 协处理器,一个工作在Master/RegionServer中的框架,能运行用户的代码,从而灵活地完成分布式数据处理的任务。

Coprocessor包含两个组件,一个是EndPoint(类似关系型数据库的存储过程),用以加快特定查询的响应,另一个就是Observer(类似关系型数据库的触发器)。Observer也分为几个类型,其中RegionObserver提供了一组表数据操作的钩子函数,覆盖了Get、Put、Scan、Delete等操作(通常有pre和post两种情况,表示在操作发生之前或发生之后),我们可以通过重载这些钩子函数,利用RegionServer实现特定的数据处理需求。

4.2.应用场景

在同一个主机集群上同时建立了HBase集群和ElasticSearch集群,存储到HBase的数据必须实时地同步到ElasticSearch。而HBase和ElasticSearch都没有更新的概念,我们的需求可以简化为两步:

· 当一个新的Put操作产生时,将Put数据转化为json,索引到ElasticSearch,并把RowKey作为新文档的ID

· 当一个新的Delete操作产生时,获取Delete数据的RowKey,删除ElasticSearch中对应的ID

4.3.Java实现

Observer的Java实现并不复杂,只需要继承BaseRegionObserver的基类,并重载postPut和postDelete两个函数。考虑到未来HBase的写入会比较频繁,我们利用ElasticSearch的Bulk API做了一个缓冲池:不是每次提交HBase数据都触发索引操作,而是积累到一定数量或者到达一定时间间隔才去批量操作,从而降低了RegionServer的网络I/O压力。

4.4.Observer的部署

Observer提供了两种部署方式:

1. 全局部署。把jar包的路径加入HBASE_CLASSPATH并且修改hbase-site.xml,这样Observer会对每一个表都生效。

2. 单表部署。通过HBase Shell修改表结构,加入coprocessor信息。

显然后一种更加灵活。通过HBase Shell安装Observer的详细步骤如下:

· 把Java项目打包为jar包,上传到HDFS的特定路径

· 进入HBase Shell,disable你希望加载的表

· 通过以下指令激活Observer:

alter 'table_name', METHOD => 'table_att', 'coprocessor' => 'hdfs:///your/jar/path/on/hdfs|com.foo.bar|1001|es_cluster=elasticsearch,es_type=video,es_index=demo,es_port=9300,es_host=localhost'

相关文章