首页新闻动态正文

HBase 中加盐(Salting)之后的表如何读取:Spar...【黑马大数据培训】

更新时间:2019年07月29日 15时44分30秒 来源:黑马程序员论坛



HBase 中加盐(Salting)之后的表如何读取:Spark 篇

      在 《HBase 中加盐(Salting)之后的表如何读取:协处理器篇》 文章中介绍了使用协处理器来查询加盐之后的表,本文将介绍第二种方法来实现相同的功能。
      我们知道,Hbase为我们提供了 hbase-mapreduce 工程包含了读取 HBase 表的 InputFormat、OutputFormat 等类。这个工程的描述如下:
This module contains implementations of InputFormat, OutputFormat, Mapper, Reducer, etc which are needed for running MR jobs on tables, WALs, HFiles and other
Hbase specific constructs. It also contains a bunch of tools: RowCounter, ImportTsv, Import, Export, CompactionTool, ExportSnapshot, WALPlayer, etc.
我们也知道,虽然上面描述的是 MR jobs,但是 Spark 也是可以使用这些 InputFormat、OutputFormat 来读写 HBase 表的,如下:
[Scala] 纯文本查看 复制代码
val sparkSession = SparkSession.builder
  .appName("HBase")
  .getOrCreate()
 
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "https://www.iteblog.com:2181")
conf.set(TableInputFormat.INPUT_TABLE, "iteblog")
 
val HBaseRdd = sparkSession.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
 
println(HBaseRdd.count())

      上面程序使用 TableInputFormat 计算了 iteblog 表的总行数。如果我们想查询某个 UID 的所有历史记录如何实现呢?如果你查看 TableInputFormat 代码,你会发现其包含了很大参数设置:
[Shell] 纯文本查看 复制代码
hbase.mapreduce.inputtable
hbase.mapreduce.splittable
hbase.mapreduce.scan
hbase.mapreduce.scan.row.start
hbase.mapreduce.scan.row.stop
hbase.mapreduce.scan.column.family
hbase.mapreduce.scan.columns
hbase.mapreduce.scan.timestamp
hbase.mapreduce.scan.timerange.start
hbase.mapreduce.scan.timerange.end
hbase.mapreduce.scan.maxversions
hbase.mapreduce.scan.cacheblocks
hbase.mapreduce.scan.cachedrows
hbase.mapreduce.scan.batchsize
hbase.mapreduce.inputtable.shufflemaps

      其中 hbase.mapreduce.inputtable 就是需要查询的表,也就是上面Spark 程序里面的 TableInputFormat.INPUT_TABLE。而 hbase.mapreduce.scan.row.start 和 hbase.mapreduce.scan.row.stop 分别对应的是需要查询的起止 Rowkey,所以我们可以利用这个信息来实现某个范围的数据查询。但是要注意的是,iteblog 这张表是加盐了,所以我们需要在 UID 之前加上一些前缀,否则是查询不到数据的。不过 TableInputFormat 并不能实现这个功能。那如何处理呢?答案是重写 TableInputFormat 的 getSplits 方法。
      从名字也可以看出 getSplits 是计算有多少个 Splits。在 HBase 中,一个 Region 对应一个 Split,对应于 TableSplit 实现类。TableSplit 的构造是需要传入 startRow 和 endRow。startRow 和 endRow 对应的就是上面 hbase.mapreduce.scan.row.start 和 hbase.mapreduce.scan.row.stop 参数传进来的值,所以如果我们需要处理加盐表,就需要在这里实现。
      另一方面,我们可以通过 RegionLocator 的 getStartEndKeys() 拿到某张表所有 Region 的 StartKeys 和 EndKeys 的。然后将拿到的 StartKey 和用户传进来的 hbase.mapreduce.scan.row.start 和 hbase.mapreduce.scan.row.stop 值进行拼接即可实现我们要的需求。根据这个思路,我们的代码就可以按照如下实现:
[Scala] 纯文本查看 复制代码
package com.iteblog.data.spark
 
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
 
import scala.collection.JavaConversions._
 
object Spark {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .appName("HBase")
      .getOrCreate()
 
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "https://www.iteblog.com:2181")
    conf.set(TableInputFormat.INPUT_TABLE, "iteblog")
    conf.set(TableInputFormat.SCAN_ROW_START, "1000")
    conf.set(TableInputFormat.SCAN_ROW_STOP, "1001")
 
    val HBaseRdd = sparkSession.sparkContext.newAPIHadoopRDD(conf, classOf[SaltRangeTableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])
 
    HBaseRdd.foreach { case (_, result) =>
      val rowKey = Bytes.toString(result.getRow)
      val cell = result.listCells()
      cell.foreach { item =>
        val family = Bytes.toString(item.getFamilyArray, item.getFamilyOffset, item.getFamilyLength)
        val qualifier = Bytes.toString(item.getQualifierArray,
          item.getQualifierOffset, item.getQualifierLength)
        val value = Bytes.toString(item.getValueArray, item.getValueOffset, item.getValueLength)
        println(rowKey + " \t " + "column=" + family + ":" + qualifier + ", " +
          "timestamp=" + item.getTimestamp + ", value=" + value)
      }
    }
  }
}

      然后我们同样查询 UID = 1000 的用户所有历史记录,那么我们的程序可以如下实现:
[Scala] 纯文本查看 复制代码
package com.iteblog.data.spark
 
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
 
import scala.collection.JavaConversions._
 
object Spark {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .appName("HBase")
      .getOrCreate()
 
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "https://www.iteblog.com:2181")
    conf.set(TableInputFormat.INPUT_TABLE, "iteblog")
    conf.set(TableInputFormat.SCAN_ROW_START, "1000")
    conf.set(TableInputFormat.SCAN_ROW_STOP, "1001")
 
    val HBaseRdd = sparkSession.sparkContext.newAPIHadoopRDD(conf, classOf[SaltRangeTableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])
 
    HBaseRdd.foreach { case (_, result) =>
      val rowKey = Bytes.toString(result.getRow)
      val cell = result.listCells()
      cell.foreach { item =>
        val family = Bytes.toString(item.getFamilyArray, item.getFamilyOffset, item.getFamilyLength)
        val qualifier = Bytes.toString(item.getQualifierArray,
          item.getQualifierOffset, item.getQualifierLength)
        val value = Bytes.toString(item.getValueArray, item.getValueOffset, item.getValueLength)
        println(rowKey + " \t " + "column=" + family + ":" + qualifier + ", " +
          "timestamp=" + item.getTimestamp + ", value=" + value)
      }
    }
  }
}

我们编译打包上面的程序,然后使用下面命令运行上述程序:
[Shell] 纯文本查看 复制代码
bin/spark-submit --class com.iteblog.data.spark.Spark 
                 --master yarn 
                 --deploy-mode cluster 
                 --driver-memory 2g 
                 --executor-memory 2g ~/hbase-1.0-SNAPSHOT.jar

得到的结果如下:
[Shell] 纯文本查看 复制代码
A-1000-1550572395399     column=f:age, timestamp=1549091990253, value=54
A-1000-1550572395399     column=f:uuid, timestamp=1549091990253, value=e9b10a9f-1218-43fd-bd01
A-1000-1550572413799     column=f:age, timestamp=1549092008575, value=4
A-1000-1550572413799     column=f:uuid, timestamp=1549092008575, value=181aa91e-5f1d-454c-959c
A-1000-1550572414761     column=f:age, timestamp=1549092009531, value=33
A-1000-1550572414761     column=f:uuid, timestamp=1549092009531, value=19aad8d3-621a-473c-8f9f
B-1000-1550572388491     column=f:age, timestamp=1549091983276, value=1
B-1000-1550572388491     column=f:uuid, timestamp=1549091983276, value=cf720efe-2ad2-48d6-81b8
B-1000-1550572392922     column=f:age, timestamp=1549091987701, value=7
B-1000-1550572392922     column=f:uuid, timestamp=1549091987701, value=8a047118-e130-48cb-adfe
.....

和前面文章使用 HBase Shell 输出结果一致。
转载自过往记忆(https://www.iteblog.com/)


推荐了解热门学科

java培训 Python人工智能 Web前端培训 PHP培训
区块链培训 影视制作培训 C++培训 产品经理培训
UI设计培训 新媒体培训 软件测试培训 Linux运维
大数据培训 智能机器人软件开发




传智播客是一家致力于培养高素质软件开发人才的科技公司“黑马程序员”是传智播客旗下高端IT教育品牌。自“黑马程序员”成立以来,教学研发团队一直致力于打造精品课程资源,不断在产、学、研3个层面创新自己的执教理念与教学方针,并集中“黑马程序员”的优势力量,针对性地出版了计算机系列教材50多册,制作教学视频数+套,发表各类技术文章数百篇。

传智播客从未停止思考

传智播客副总裁毕向东在2019IT培训行业变革大会提到,“传智播客意识到企业的用人需求已经从初级程序员升级到中高级程序员,具备多领域、多行业项目经验的人才成为企业用人的首选。”

中级程序员和初级程序员的差别在哪里?
项目经验。毕向东表示,“中级程序员和初级程序员最大的差别在于中级程序员比初级程序员多了三四年的工作经验,从而多出了更多的项目经验。“为此,传智播客研究院引进曾在知名IT企业如阿里、IBM就职的高级技术专家,集中研发面向中高级程序员的课程,用以满足企业用人需求,尽快补全IT行业所需的人才缺口。

何为中高级程序员课程?

传智播客进行了定义。中高级程序员课程,是在当前主流的初级程序员课程的基础上,增加多领域多行业的含金量项目,从技术的广度和深度上进行拓展“我们希望用5年的时间,打造上百个高含金量的项目,覆盖主流的32个行业。”传智播客课程研发总监于洋表示。




黑马程序员热门视频教程

Python入门教程完整版(懂中文就能学会) 零起点打开Java世界的大门
C++| 匠心之作 从0到1入门学编程 PHP|零基础入门开发者编程核心技术
Web前端入门教程_Web前端html+css+JavaScript 软件测试入门到精通


在线咨询 我要报名
和我们在线交谈!