HBase实操:HBase-Spark-Read-Demo 分享

2020-06-23 00:00:00 专区 订阅 付费 获取 关注

前言:本文是一个关于Spark读取HBase的一个小demo,简单了解一下~

相关代码:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}


object SparkReadHBaseDemo {

  val HBASE_ZOOKEEPER_QUORUM = "xxx1.com.cn,xxx2.com.cn,xxx3.com.cn"

  //   主函数
  def main(args: Array[String]) {

    // 设置spark访问入口
    val conf = new SparkConf().setAppName("SparkReadHBaseDemo ")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .setMaster("local")//调试
    val sc = new SparkContext(conf)
    // 获取HbaseRDD
    val hbaseRDD = sc.newAPIHadoopRDD(getHbaseConf(), classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    hbaseRDD.map(_._2).map(getRes(_)).count()
  }


  def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
    val rowkey = Bytes.toString(result.getRow())
    val addr = Bytes.toString(result.getValue("f".getBytes, "addr".getBytes))
    println(rowkey+"---"+addr)
    addr
  }
  // 构造 Hbase 配置信息
  def getHbaseConf(): Configuration = {
    val conf: Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
    // 设置查询的表名
    conf.set(TableInputFormat.INPUT_TABLE, "test_shx")
    conf.set(TableInputFormat.SCAN, getScanStr())
    conf
  }

  // 获取扫描器
  def getScanStr(): String = {
    val scan = new Scan()
    // scan.set.....各种过滤
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray())
  }
}

相关文章