入门"/>
【SpringBoot+HBase 】快速入门
SpringBoot+HBase 快速入门文档
- 前言
- 一、HBase常用shell命令
- 1.1 基础命令
- 1.2 表的基本操作
- 1.3 表的数据增删改查指令
- 1.3.1 HBase数据模型
- 1.3.2 表的增删改查指令
- 1.3.3 过滤器条件查询
- 二、JavaAPI操作Hbase
- 2.1 Maven依赖
- 2.2 复制HBase和Hadoop配置文件
- 2.3 创建Hbase连接以及admin管理对象
- 2.4 使用JavaAPI创建一个表
- 2.5 插入一条数据到HBase表中
- 三. 千万级数据hbase查询demo
- 3.1 案例背景
- 3.1 设计rowkey规则
- 3.2 预分区
- 3.3 模拟数据存储
- 3.4 创建hbase表数据映射对象
- 3.5 通过rowkey查询某一行数据
- 3.6 获取表中所有数据
- 3.7 过滤器组合查询
- 3.7.1 模糊查询
- 3.7.2 可降采样的查询指定传感器某一时间范围内的所有数据
- 3.7.3 可降采样查询某一时间范围内多个传感器的值
- 四、其他小知识
- 后记
前言
本文是在已经搭建好的hbase数据库基础上进行的JavaAPI开发。HBASE搭建可以参考下面这篇文章,如果在自己电脑上搭建,配置不够用,只搭建一个单节点的hbase也行。
WMware上搭建基于Ubuntu18.04的Zookeeper+Hadoop+HBase集群
本文是一个单节点的hbase,搭建在2核4G的Ubuntu18.04系统的云服务器上。如下面进程所示,Zookeeper+Hadoop+HBase都是搭建在该服务器上。
root@whut:~/bin# jps
1232 jar
3457 NameNode
4065 SecondaryNameNode
5251 JobHistoryServer
4452 ResourceManager
5892 HRegionServer
2967 QuorumPeerMain
4697 NodeManager
5626 HMaster
7340 Jps
27213 jar
3693 DataNode
通过http://ip:16010
查看hbase的状态,可以发现该hbase只有一个节点。如果是云服务器搭建,记得开放相应16010
端口。
题外话:hbase和hadoop的运行机制和原理比较复杂,但是作为入门来说,只需要把hbase当做一个像MySql一样的数据库,会使用即可,等入门以后在了解原理也不迟。
一、HBase常用shell命令
本章是通过hbase自带的shell命令,对数据表完成常见的增删改查
操作,同时有助于后文对Hbase的JavaAPI的理解。
- 进入hbase命令行通过以下命令
root@whut:~/bin# hbase shell
输入指令后,进入如下所示界面
1.1 基础命令
- 查看hbase版本
hbase(main):001:0> version
1.3.1, r930b9a55528fe45d8edce7af42fef2d35e77677a, Thu Apr 6 19:36:54 PDT 2017
- 查看服务器状态
hbase(main):002:0> status
1 active master, 0 backup masters, 1 servers,
1.2 表的基本操作
- 查看所有表
hbase(main):001:0> list
TABLE
WD_TABLE
1 row(s) in 0.2690 seconds
- 创建表
命令行格式:create tablename 列族1,列族2,…
例如:创建表名:数据表(DATA_TABLE),温度数据(WD_TABLE),应变数据(YB_DATA)
hbase(main):001:0> create 'DATA_TABLE','WD_DATA','YB_DATA'
0 row(s) in 2.5610 seconds=> Hbase::Table - DATA_TABLE
- 查看表的基本信息
hbase(main):003:0> describe 'DATA_TABLE'
Table DATA_TABLE is ENABLED
DATA_TABLE
COLUMN FAMILIES DESCRIPTION
{NAME => 'WD_DATA', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING=> 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_S
COPE => '0'}
{NAME => 'YB_DATA', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING=> 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_S
COPE => '0'}
2 row(s) in 0.1220 seconds
可以看到两个列族,‘WD_DATA’,‘YB_DATA’
- 检查某个表是否存在
hbase(main):005:0> exists 'DATA_TABLE'
Table DATA_TABLE does exist
0 row(s) in 0.0140 seconds
- 禁用/启用表
- 禁用表
disable ‘DATA_TABLE ’
- 检查表是否被禁用
is_disabled ‘DATA_TABLE ’
- 启用表
enable ‘DATA_TABLE ’
- 检查表是否被启用
is_enabled ‘DATA_TABLE ’
- 表的删除
删除表之前,需要先禁用表,否则会报error
hbase(main):008:0> drop 'DATA_TABLE'
ERROR: Table DATA_TABLE is enabled. Disable it first.
hbase(main):010:0> disable 'DATA_TABLE'
0 row(s) in 2.2430 seconds
hbase(main):011:0> drop 'DATA_TABLE'
0 row(s) in 1.2460 seconds
hbase(main):017:0> list
TABLE
0 row(s) in 0.0140 seconds
- 清空表的数据
实际是做了以下3件事:禁用表-删除表-创建表(因此不会删除表结构,如列簇)
语法格式:truncate 表名
truncate 'DATA_TABLE'
1.3 表的数据增删改查指令
在进行增删改查之前,我们首先要简单了解一下hbase的数据表结构,hbase是非关系型数据库(NoSQL),数据存储以键值对形式。
这里以一个公司员工表为案例来讲解,此表中包含员工基本信息(员工姓名、年龄),员工详细信息(工资、角色),以及时间戳。整体表结构如下
如上,每一行有一个RowKey用于唯一地标识和定位行,各行数据按RowKey的字典序排列。其中ImployeeBasicInfoCLF和DetailInfoCLF是两个列族,列族下又有多个具体列。(员工基本信息列族:姓名、年龄。详细信息列族:薪水、角色)
1.3.1 HBase数据模型
-
命名空间
命名空间是对表的逻辑分组,不同的命名空间类似于关系型数据库中的不同的Database数据库。利用命名空间,在多租户场景下可做到更好的资源和数据隔离。 -
表
对应于关系型数据库中的一张张表,HBase以“表”为单位组织数据,表由多行组成。 -
行
行由一个RowKey和多个列族组成,一个行有一个RowKey,用来唯一标示。 -
列族
每一行由若干列族组成,每个列族下可包含多个列,如上ImployeeBasicInfoCLF和DetailInfoCLF即是两个列族。列族是列共性的一些体现。注意:物理上,同一列族的数据存储在一起的。 -
列限定符
列由列族和列限定符唯一指定,像如上的name、age即是ImployeeBasicInfoCLF列族的列限定符。 -
单元格
单元格由RowKey、列族、列限定符唯一定位,单元格之中存放一个值(Value)和一个版本号。 -
时间戳
单元格内不同版本的值按时间倒序排列,最新的数据排在最前面
1.3.2 表的增删改查指令
- 增加列簇
命令行格式:alter tablename 列族1,列族2,…
在DATA_TABLE
原有的WD_DATA
,YB_DATA
的基础上再增加一个列簇ZD_DATA
alter 'DATA_TABLE','ZD_TABLE'
- 删除列簇
命令行格式:alter tablename { NAME => '列族',METHOD => 'delete ' }
alter 'DATA_TABLE',{ NAME =>'YB_DATA', METHOD => 'delete' }
- 插入数据
命令行格式:put ‘表名’,行键名’,‘列族:列名’,‘值’
注:如果待插入数据的表名、行键值、列族名、列名与原有数据相同,则是更新操作
put 'DATA_TABLE', 'rowkey_one','WD_DATA:value','22.3'
put 'DATA_TABLE', 'rowkey_one','WD_DATA:time','2021-01-01'
put 'DATA_TABLE', 'rowkey_one','YB_DATA:value','25.6'
put 'DATA_TABLE', 'rowkey_one','YB_DATA:time','2021-01-02'put 'DATA_TABLE', 'rowkey_two','WD_DATA:value','25.6'
put 'DATA_TABLE', 'rowkey_two','WD_DATA:time','2021-01-11'
put 'DATA_TABLE', 'rowkey_two','YB_DATA:value','14.5'
put 'DATA_TABLE', 'rowkey_two','YB_DATA:localtion','2021-02-02'
- 查看表中所有数据
插入完数据后,使用scan 表名
查看数据。
hbase(main):017:0> scan 'DATA_TABLE'ROW COLUMN+CELL rowkey_one column=WD_DATA:time, timestamp=1635040680226, value=2021-01-01 rowkey_one column=WD_DATA:value, timestamp=1635040670088, value=22.3 rowkey_one column=YB_DATA:time, timestamp=1635040790205, value=2021-01-02 rowkey_one column=YB_DATA:value, timestamp=1635040789577, value=25.6 rowkey_two column=WD_DATA:time, timestamp=1635040680522, value=2021-01-11 rowkey_two column=WD_DATA:value, timestamp=1635040680494, value=25.6 rowkey_two column=YB_DATA:localtion, timestamp=1635040794691, value=2021-02-02 rowkey_two column=YB_DATA:value, timestamp=1635040793862, value=14.5
scan是扫描整个表(包括行键rowkey和所有的列簇以及里面的key-value数据),当数据量比较大时候,scan效率较低,想要统计表中的数据行数,可以用count 表名
来统计。
hbase(main):030:0> count 'DATA_TABLE'
2 row(s) in 0.0220 seconds
或者可以使用LIMIT
限制扫描条数
scan 'DATA_TABLE',{ LIMIT => 1}
- 查询一行记录
- 获得指定行的数据
命令行格式:get 表名, 行键
hbase(main):036:0> get 'DATA_TABLE','rowkey_one'COLUMN CELL WD_DATA:time timestamp=1635040680226, value=2021-01-01 WD_DATA:value timestamp=1635040670088, value=22.3 YB_DATA:time timestamp=1635040790205, value=2021-01-02 YB_DATA:value timestamp=1635040789577, value=25.6
- 获得指定行和列簇的数据
命令行格式:get 表名, 行键,列簇
hbase(main):002:0> get 'DATA_TABLE','rowkey_one','WD_DATA'COLUMN CELL WD_DATA:time timestamp=1635040680226, value=2021-01-01 WD_DATA:value timestamp=1635040670088, value=22.3
1 row(s) in 0.0130 seconds
- 获得指定列簇的数据
命令行格式:get 表名, { COLUMN => 列簇}
hbase(main):003:0> scan 'DATA_TABLE',{ COLUMN => 'WD_DATA' }ROW COLUMN+CELL rowkey_one column=WD_DATA:time, timestamp=1635040680226, value=2021-01-01 rowkey_one column=WD_DATA:value, timestamp=1635040670088, value=22.3 rowkey_two column=WD_DATA:time, timestamp=1635040680522, value=2021-01-11 rowkey_two column=WD_DATA:value, timestamp=1635040680494, value=25.6
1.3.3 过滤器条件查询
hbase的过滤器种类非常多,将不同的过滤器组合使用可以实现非常丰富的功能,这里只通过hbase shell指令实现几个简单的过滤器,以便更好的理解后文中JavaAPI操作hbase。下图是hbase常见的过滤器
过滤器语法:
scan '表名', { Filter => "过滤器(比较运算符, '比较器表达式')” }
上一小节使用的get
指令只能获得一行数据,如果想要获得多行数据,需要用scan
命令来对表进行扫描。
- 值过滤器
hbase(main):004:0> scan 'DATA_TABLE',FILTER=>"ValueFilter(=,'binary:22.3')"ROW COLUMN+CELL rowkey_one column=WD_DATA:value, timestamp=1635040670088, value=22.3
- 查询列名前缀模糊过滤
hbase(main):006:0> scan 'DATA_TABLE',FILTER=>"ColumnPrefixFilter('tim')"ROW COLUMN+CELL rowkey_one column=WD_DATA:time, timestamp=1635040680226, value=2021-01-01 rowkey_one column=YB_DATA:time, timestamp=1635040790205, value=2021-01-02 rowkey_two column=WD_DATA:time, timestamp=1635040680522, value=2021-01-11
2 row(s) in 0.0250 seconds
- FILTER中支持多个过滤条件通过括号、AND和OR进行组合:
hbase(main):002:0> scan 'DATA_TABLE',FILTER=>"ColumnPrefixFilter('valu') AND ValueFilter(=,'binary:22.3')"
ROW COLUMN+CELL rowkey_one column=WD_DATA:value, timestamp=1635040670088, value=22.3
更多过滤器知识,可以查看以下博客:
Hbase过滤器
HBase过滤器笔记
二、JavaAPI操作Hbase
2.1 Maven依赖
其中Hbase的Maven依赖最好和其版本号一致
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- HBase依赖 --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>2.3.6</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.6</version></dependency></dependencies>
2.2 复制HBase和Hadoop配置文件
使用finalShell或者其他远程工具,将以下三个配置文件复制到Springboot项目下的resource目录中,这3个文件在hadoop的安装目录下的
$HADOOP_HOME/etc/hbase/conf
中。
- hbase-site.xml
- core-site.xml
- log4j.properties
root@whut:/opt/module/hbase/conf# sz core-site.xml hbase-site.xml log4j.properties
注意:请确认配置文件中的服务器节点hostname/ip地址配置正确。如下图使用了whut代替ip,要在修改本机的hosts文件中做好映射。
hosts文件在:C:\Windows\System32\drivers\etc
目录下。
2.3 创建Hbase连接以及admin管理对象
实现步骤:
- 使用HbaseConfiguration.create()创建Hbase配置
- 使用ConnectionFactory.createConnection()创建Hbase连接
- 要创建表,需要基于Hbase连接获取admin管理对象
- 使用admin.close、connection.close关闭连接
使用单例模式,创建一个HBaseConfig类管理admin()对象和Table对象。
@Slf4j
public class HBaseConfig {public static Configuration configuration =null;public static Connection connection =null;public static Admin admin=null;/*这里使用单例模式* 因为配置只需要读取一次就够了,使用static代码块,读取一次,* HBaseConfiguration.create() 该方法会自动读取 resources:下的hbase-site.xml文件* */static {try {configuration=HBaseConfiguration.create();connection = ConnectionFactory.createConnection(configuration);admin=connection.getAdmin();} catch (IOException e) {e.printStackTrace();log.warn("HBase 连接失败");}}public static Admin getAdmin() {return admin;}public static Connection getConnection() {return connection;}public static Configuration getConfiguration() {return configuration;}/** 关闭资源* */public static void close(){if(admin!=null) {try {admin.close();} catch (IOException e) {e.printStackTrace();}}if(connection != null){try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
Connection
是连接对象,可以获得HBase的管理对象Admin
,以及’Table’表管理对象
Admin
操作对象是数据库表。用于对表的创建、修改、删除。Admin
对象可以获得表数据管理对象Table
Table
操作对象是表里的数据。用于对表的数据进行增删改查。
2.4 使用JavaAPI创建一个表
- 首先创建一个工具类,该工具类包含3个方法:
- 判断表是否存在
- 创建一个表
- 向指定表的指定列簇中添加一行数据
/*** @author WZH* @create 2021-10-24 18:17* @desc hbase工具类**/
@Slf4j
public class HBaseUtils {private static Admin admin =HBaseConfig.getAdmin();private static Connection connection =HBaseConfig.getConnection();/*** 判断表是否存在* * @return {{@link null}}* @author WZH* @date 2021/9/14 11:00*/public static boolean isTableExist(String tableName){try {return admin.tableExists(TableName.valueOf(tableName));} catch (IOException e) {e.printStackTrace();log.warn(tableName+"不存在");return false;}}/**** @param tableName* @param columnFamily* @return {}* @author WZH* @date 2021/9/14 11:05*/public static void createTable(String tableName, String... columnFamily) throwsMasterNotRunningException, ZooKeeperConnectionException, IOException{//判断表是否存在if(isTableExist(tableName)){System.out.println("表" + tableName + "已存在");}else{//创建表属性对象,表名需要转字节HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));//创建多个列族for(String cf : columnFamily){descriptor.addFamily(new HColumnDescriptor(cf));}//根据对表的配置,创建表admin.createTable(descriptor);System.out.println("表" + tableName + "创建成功!");}}/*** 插入一行数据* @param tableName 表名* @param rowKey 行键* @param columnFamily 列簇* @param column 列名* @param value 值* @return {}* @author WZH* @date 2021/9/14 11:14*/public static void addRowData(String tableName, String rowKey, String columnFamily, Stringcolumn, String value) throws IOException{//创建Table对象Table table = connection.getTable(TableName.valueOf(tableName));//向表中插入数据Put put = new Put(Bytes.toBytes(rowKey));//向Put对象中组装数据put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
// 添加数据到tabletable.put(put);table.close();}
}
- 在main()方法,测试创建一个包含一个
info
列簇的Student
表
public static void main(String[] args) {System.setProperty("hadoop.home.dir", "E:\\test");try {HBaseUtils.createTable("Students","info");} catch (IOException e) {e.printStackTrace();}
}
然后在hbase shell脚本中,通过list
语句,查看表Student
是否创建成功。
hbase(main):004:0> listTABLE
DATA_TABLE
Students
2 row(s) in 0.0140 seconds
=> ["DATA_TABLE", "Students"]
通过运行结果发现,表格已经成功创建。
注:程序运行可能会报HADOOP_HOME
没有配置的error
,这是因为HADOOP不是在win10本地,如果不影响表创建,可以不去管它。
2.5 插入一条数据到HBase表中
public static void main(String[] args) {try {HBaseUtils.addRowData("DATA_TABLE", "WD001", "WD_DATA", "value", "20");} catch (IOException e) {e.printStackTrace();}}
执行完程序后,在hbase shell中输入 scan 'DATA_TABLE'
检查是否有插入的数据。
hbase(main):005:0> scan 'DATA_TABLE'ROW COLUMN+CELL WD001 column=WD_DATA:value, timestamp=1635077761411, value=20
三. 千万级数据hbase查询demo
为了更快上手hbase的操作,本文通过一个千万级数据查询demo来讲解。虽然可能并没有存千万级的数据,但是本文的demo是通过一个小项目提取出来的,该项目是达到了亿级的数据量,因此对于千万级的数据查询,下面的方法也是适用的。
demo代码通过以下地址可以获取:
.git
3.1 案例背景
现有A厂有5种温度传感器,传感器编号分别是:WD000,WD001,WD002,WD003,WD004。
现在需要对传感器数据进行存储,并暴露查询接口,给前端进行数据展示。
传感器采集的每一条数据包含:温度的时间戳timeStamp
,温度的值value
,以及传感器IDsensorId
3.1 设计rowkey规则
一条数据的唯一标识就是 RowKey,那么这条数据存储于哪个分区,取决于 RowKey 处于哪个一个预分区的区间内,设计 RowKey 的主要目的 ,就是让数据均匀的分布于所有的region 中。
当数据量到达一定程度时候,如一千万条数据,通过key-value
来查询,hbase会扫描整个表的key-value
,这是一个非常耗时间的操作,可能会达到30秒甚至更多。一条数据的唯一标识就是 RowKey,那么这条数据存储于哪个分区,取决于 RowKey 处于哪个一个预分区的区间内,设计 RowKey 的主要目的 ,就是让数据均匀的分布于所有的region 中。
而合理设计一个rowkey
,将要查询的’key-value’信息存入到rowkey中(比如将传感器的ID存在rowkey中),然后仅仅扫描rowkey这一列,效率会非常高,仅需0.几秒就可以检索到满足条件的数据。
本文rowkey=传感器IDsensorID
+时间time
3.2 预分区
- 什么是分区?
当一个HBase表的存储region过大(达到默认10GB)时,表将会进行split,分裂为2个分区。表在进行split的时候,会耗费大量的资源,频繁的分区对HBase的性能有巨大的影响。 - 什么是预分区?
简单来说,就是在hbase数据表,刚建立的时候就进行分区。 - 预分区原理
hbase存储数据是按照rowkey的字典顺序存储,每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护 - 为什么要预分区?
- 防止数据热写
集中写到某一台或者几台机器上,给服务器造成太大压力以及更严重后果 - 减少hbase表的split带来的资源消耗
- 增加数据读写效率
- 负载均衡,防止数据倾斜
- 方便集群容灾调度region
- 优化Map数量
hbase表预分区shell指令语法:
create '表名','列簇1','列簇2'...,SPLITS => ['rowkey1','rowkey2','rowkey3'...]
# 通过rowket的前缀,分为5个范围 WD000-WD001 WD001-WD002 WD002-WD003 WD004-WD005
create 'WD_TABLE','WD_DATA',SPLITS => ['WD000','WD001','WD002','WD003','WD004']
创建完后,可以通过http://hbase所在的ip:16010
查看创建的hbase表。
点击WD_DATA,查看表的分区信息,可以发现表已经按照rowkey分成了几个Region了。
由于本文是一个节点,所以所有RegionServer都在whut
这个节点上,当hbase是集群搭建时候,就会把不同的RegionServer分布在不同的节点上。比如下图所示3节点的hbase集群,10个RegionServer被自动分布在3个节点。
3.3 模拟数据存储
首先模拟一个生产者,向hbase数据表中存入数据,通过main()方法调用一下类
/*** 生产HBase需要的数据* @author WZH* @create 2021-10-24 20:58* @desc**/
public class DataProducer {public static final String TABLE_NAME = "WD_TABLE";public static final String Column_FAMILY = "WD_DATA";private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");public void dataProducer() {// 5种温度传感器 WD000 WD001 WD002 WD003 WD004int SensorType =5;int count=1000;for (int j = 0; j < SensorType; j++) {for (int i = 0; i < count; i++) {String timeStamp = simpleDateFormat.format(new Date());String sensorId ="WD00"+j;String rowkey = sensorId + timeStamp;try {HBaseUtils.addRowData(TABLE_NAME,rowkey,Column_FAMILY,"sensorId",sensorId);HBaseUtils.addRowData(TABLE_NAME,rowkey,Column_FAMILY,"time",timeStamp);HBaseUtils.addRowData(TABLE_NAME,rowkey,Column_FAMILY,"value",String.valueOf(20+Math.random()*3));} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {DataProducer dataProducer = new DataProducer();dataProducer.dataProducer();}
}
执行上诉的main()方法后,在hbase shell中通过count
命令查看数据表中的数据
hbase(main):062:0> count 'WD_TABLE'Current count: 1000, row: WD00020211025111818932
Current count: 2000, row: WD00120211025111906576
Current count: 3000, row: WD00220211025111953843
Current count: 4000, row: WD00320211025112041904
Current count: 5000, row: WD00420211025112130291
5000 row(s) in 0.3090 seconds
=> 5000
3.4 创建hbase表数据映射对象
该类用于hbase查询时候封装数据。
/*** 传感器数据封装对象* @author WZH* @create 2021-10-25 11:20* @desc**/
@Data
public class DataPojo {private String SensorId;private String value;private String time;
}
3.5 通过rowkey查询某一行数据
首先创建一个HBaseDao类,用于存放相关的增删改查方法。后续方法如果没有特殊说明,都是在这个类中。
/*** HBaseDao 存放各种查询方法* @author WZH* @create 2021-10-24 20:53* @desc**/
@Slf4j
@Repository
public class HBaseDao {private Connection connection=HBaseConfig.getConnection();/*** 通过rowke查询某一行的数据* @param* @return {}* @author WZH*/public void queryByRowKey(String tableName,String rowKey) throws IOException {
// 1. 获取操作表的table对象Table table = connection.getTable(TableName.valueOf(tableName));
// 2. 创建get对象Get get = new Get(Bytes.toBytes(rowKey));
// 3. 执行get请求Result result = table.get(get);
// 4. 查看resultList<Cell> cells = result.listCells();for (Cell cell : cells) {// 打印列蔟名System.out.print("行键:"+ Bytes.toString(CellUtil.cloneRow(cell))+" ");System.out.print("列簇:"+Bytes.toString(CellUtil.cloneFamily(cell))+" ");System.out.print("列:"+ Bytes.toString(CellUtil.cloneQualifier(cell))+" ");System.out.println("值:"+ Bytes.toString(CellUtil.cloneValue(cell))+" ");}table.close();}
}
然后在main()函数中测试
public static void main(String[] args) {HBaseDao hBaseDao =new HBaseDao();try {hBaseDao.queryByRowKey("WD_TABLE","WD00020211025111818932");} catch (IOException e) {e.printStackTrace();}}
输出结果如下:
行键:WD00020211025111818932 列簇:WD_DATA 列:time 值:20211025111818932
行键:WD00020211025111818932 列簇:WD_DATA 列:value 值:21.913362703717333
3.6 获取表中所有数据
/*** 获取所有数据* @param tableName* @return {}* @author WZH* @date 2021/9/14 12:40*/public List<DataPojo> getAllRows(String tableName) throws IOException{Table hTable = connection.getTable(TableName.valueOf(tableName));//得到用于扫描region的对象Scan scan = new Scan();//使用HTable得到resultcanner实现类的对象ResultScanner resultScanner = hTable.getScanner(scan);List<DataPojo> dataList = DataProcessUtils.getDataList(resultScanner);return dataList;}
这里为了简化代码,将数据封装的工作,通过一个DataProcessUtils
类进行封装。
/*** @author WZH* @create 2021-09-19 14:12* @desc HBase数据处理、封装工具类**/
public class DataProcessUtils {public static List<DataPojo> getDataList(ResultScanner scanner) {//创建一个List用于保存数据List<DataPojo> dataList = new ArrayList<DataPojo>();for (Result result : scanner) {List<Cell> cellList = result.listCells();DataPojo dataPojo = new DataPojo();// 迭代单元格列表for (Cell cell : cellList) {// 打印列蔟名String cell_key = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());String cell_value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());if ("sensorId".equals(cell_key)) {dataPojo.setSensorId(cell_value);}if ("time".equals(cell_key)) {dataPojo.setTime(cell_value);}if ("value".equals(cell_key)) {dataPojo.setValue(String.format("%.3f", Float.valueOf(cell_value)));}}dataList.add(dataPojo);}return dataList;}
}
在main()方法中进行测试
public static void main(String[] args) {HBaseDao hBaseDao =new HBaseDao();try {List<DataPojo> dataList = hBaseDao.getAllRows("WD_TABLE");System.out.println(dataList.get(0));} catch (IOException e) {e.printStackTrace();}}
}
这里我们只打印数组长度和第一个元素,打印结果如下:
一共查询到了:5000个数据
DataPojo(sensorId=WD000, value=20.282, time=20211025150229253)
3.7 过滤器组合查询
3.7.1 模糊查询
通过RowFileter和正则匹配过滤器,实现模糊查询符合条件的数据。
/*** 通过rowkey 模糊查询某个时间的某个表格的数据* @param time :时间字符串 形式:yyyyMMddHH... 2021091913* @return {{@link List<DataPojo>}}* @author WZH* @date 2021/9/19 14:08*/public List<DataPojo> fuzzyQueryByRowKey(String time,String tableName) throws IOException {Table table = connection.getTable(TableName.valueOf(tableName));Scan scan = new Scan();// 通过正则匹配 rowkeyRowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*" + time + ".*"));scan.setFilter(rowFilter);ResultScanner scanner = table.getScanner(scan);//通过工具类来封装hbase查询到的数据List<DataPojo> dataList = DataProcessUtils.getDataList(scanner);return dataList;}
main()方法中测试:
public static void main(String[] args) {HBaseDao hBaseDao =new HBaseDao();try {//查询2021-10-25 15:03:46的数据List<DataPojo> dataList = hBaseDao.fuzzyQueryByRowKey("20211025150346", "WD_TABLE");System.out.println("一共查询到了:"+dataList.size()+"个数据");System.out.println(dataList.get(0));} catch (IOException e) {e.printStackTrace();}}
查询结果:
一共查询到了:14个数据
DataPojo(sensorId=WD000, value=20.709, time=20211025150346028)
3.7.2 可降采样的查询指定传感器某一时间范围内的所有数据
/*** 通过rowkey 查询某一时间段的数据* @return {{@link List<DataPojo>}}* @author WZH* @date 2021/9/22 15:21*/public List<DataPojo> randomDownSampleQuery(String startTime, String endTime, String tableName, String sensorId, float SampleRate) throws IOException {Table table = connection.getTable(TableName.valueOf(tableName));Scan scan = new Scan();//设置查询范围scan.withStartRow(Bytes.toBytes(sensorId+startTime));scan.withStopRow(Bytes.toBytes(sensorId+endTime));// 随机过滤,实现降采样 需要传入一个 0-1之间的float型数字. RandomRowFilter randomRowFilter = new RandomRowFilter(new Float(SampleRate));scan.setFilter(randomRowFilter);ResultScanner scanner = table.getScanner(scan);List<DataPojo> dataList = DataProcessUtils.getDataList(scanner);// 关闭资源scanner.close();table.close();log.info("一共查询了"+dataList.size()+"记录");return dataList;}
这里的降采样是通过一个随机过滤器RandomRowFilter
,通过阅读源码可知,该过滤器需要传入一个0-1之间的float型数字,这个数字就是筛选的比例。
- 当小于0就是不过滤,直接全部返回
- 当大于1,所有的都pass掉
- 当在0-1之间,它回通过random产生一个随机数,通过对比随机数和传入的数字大小,来决定是否过滤。
在main()方法中测试,查询202110251503
-202110251504
时间范围内WD001
传感器的数据:
public static void main(String[] args) {HBaseDao hBaseDao =new HBaseDao();try {
// hBaseDao.queryByRowKey("WD_TABLE","WD00020211025111818932");
// List<DataPojo> dataList = hBaseDao.getAllRows("WD_TABLE");//查询2021-10-25 15:03:46的数据
// List<DataPojo> dataList = hBaseDao.fuzzyQueryByRowKey("20211025150346", "WD_TABLE");List<DataPojo> dataList = hBaseDao.randomDownSampleQuery("202110251503", "202110251504", "WD_TABLE", "WD001", 0.5f);} catch (IOException e) {e.printStackTrace();}}
执行结果:
15:48:51.379 [main] INFO com.example.demo.dao.HBaseDao - 本次降查询一共耗时2秒
15:48:51.379 [main] INFO com.example.demo.dao.HBaseDao - 一共查询了100记录
3.7.3 可降采样查询某一时间范围内多个传感器的值
/*** 可降采样查询某一时间范围内多个传感器的值* @param startTime 开始时间* @param endTime 结束时间* @param tableName HBASE表名称* @param siteList 存放带查询的测点的集合* @param sampleRate 降采样频率,在 0-1 之间,若 < 0,则不进行降采样* @return {{@link List<DataPojo>}}* @author WZH* @date 2021/10/20 20:49*/public List<DataPojo> multiSiteQuery(String startTime, String endTime,String tableName, List<String> siteList, float sampleRate) throws IOException {Table table = connection.getTable(TableName.valueOf(tableName));Scan scan = new Scan();long time1 =new Date().getTime();//创建一个 RowRange 的 List 实现多值过滤ArrayList<MultiRowRangeFilter.RowRange> rowRanges = new ArrayList<>();for (String site : siteList) {MultiRowRangeFilter.RowRange rowRange = new MultiRowRangeFilter.RowRange(site+startTime, true,site+endTime, true);rowRanges.add(rowRange);}MultiRowRangeFilter multiRowRangeFilter = new MultiRowRangeFilter(rowRanges);// 过滤器组合列表 所有过滤器,与操作FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,multiRowRangeFilter); 随机过滤,实现降采样if (sampleRate < 1f && sampleRate >0f) {RandomRowFilter randomRowFilter = new RandomRowFilter(sampleRate);filterList.addFilter(randomRowFilter);}scan.setFilter(filterList);ResultScanner scanner = table.getScanner(scan);List<DataPojo> dataList = DataProcessUtils.getDataList(scanner);// 关闭资源scanner.close();table.close();long time2 = new Date().getTime();long l = (time2 - time1) / 1000L;log.info("本次降查询一共耗时"+l+"秒");log.info("一共查询了"+dataList.size()+"记录");log.info("本次查询使用了"+filterList.size()+"个过滤器");return dataList;}
在main()方法中测试,查询202110251503
-202110251504
时间范围内,WD000 WD002 WD003
3个传感器的数据。
public static void main(String[] args) {HBaseDao hBaseDao =new HBaseDao();try {ArrayList<String> siteList = new ArrayList<>();siteList.add("WD000");siteList.add("WD002");siteList.add("WD003");List<DataPojo> dataList = hBaseDao.multiSiteQuery("202110251503", "202110251504", "WD_TABLE", siteList,0.01f);for (DataPojo dataPojo : dataList) {System.out.println(dataPojo);}} catch (IOException e) {e.printStackTrace();}}
16:00:39.047 [main] INFO com.example.demo.dao.HBaseDao - 本次降查询一共耗时1秒
16:00:39.047 [main] INFO com.example.demo.dao.HBaseDao - 一共查询了9记录
16:00:39.047 [main] INFO com.example.demo.dao.HBaseDao - 本次查询使用了2个过滤器
DataPojo(sensorId=WD000, value=20.591, time=20211025150303725)
DataPojo(sensorId=WD000, value=20.174, time=20211025150304734)
DataPojo(sensorId=WD000, value=21.907, time=20211025150308801)
DataPojo(sensorId=WD000, value=21.004, time=20211025150309758)
DataPojo(sensorId=WD000, value=22.650, time=20211025150314625)
DataPojo(sensorId=WD000, value=20.768, time=20211025150326350)
DataPojo(sensorId=WD000, value=21.067, time=20211025150330299)
DataPojo(sensorId=WD000, value=21.403, time=20211025150331220)
DataPojo(sensorId=WD000, value=20.037, time=20211025150336603)
更多过滤器请查考一下文章:
四、其他小知识
- 比
count
更快的统计行数的指令。
在 hbase安装目录下的$HBASE_HOME/bin
命令执行:
hbase org.apache.hadoop.hbase.mapreduce.RowCounter '表名'
后记
本文仅仅对hbase的入门知识作了一个简单介绍,在完成这个入门案例后,在去学习一些更深入的habse知识,比如和hive结合等,都会更加轻松。
更多推荐
【SpringBoot+HBase 】快速入门
发布评论