加载中...
Hbase介绍及操作
发表于:2021-05-21 | 分类: 大数据
字数统计: 3.3k | 阅读时长: 16分钟 | 阅读量:

1. Hbase概述

1.1 Hbase是什么

  • HBase是建立在HDFS之上的分布式面向列的数据库;属于KV结构数据,原生不支持标准SQL。它是一个Apache的开源项目,是横向扩展的。
  • HBase可以提供快速随机访问海量结构化数据。它利用了Hadoop的文件系统(HDFS)提供的容错能力。
  • HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库,是HBase基于列的而不是基于行的模式。

1.2 Hbase数据单元

RowKey:是Byte array,是表中每条记录的“主键”,按照字典顺序排序,方便快速查找,Rowkey的设计非常重要;
Column Family:列族,拥有一个名称(string),包含一个或者多个相关列;
Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加;
Version Number:类型为Long,默认值是系统时间戳Timestamp,可由用户自定义;用于标记同一份数据的不同版本。
Value(Cell):Byte array;

2. Hbase的架构

2.1 Hbase的架构组成

HBase采用Master/Slave架构搭建集群,它隶属于Hadoop生态系统,由以下类型节点组成:HMaster节点、HRegionServer节点、ZooKeeper集群,而在底层,它将数据存储于HDFS中,因而涉及到HDFS的NameNode、DataNode等,总体结构如下:

各组件说明:
Client

  • 使用HBase RPC机制与HMaster和HRegionServer进行通信;
  • Client与HMaster进行通信进行管理类操作;
  • Client与HRegionServer进行数据读写类操作;

HMaster

  • HMaster没有单点问题,HBase中可以启动多个HMaster,通过Zookeeper保证总有一个Master在运行。

HMaster主要负责Table和Region的管理工作

  • 管理用户对表的增删改查操作;
  • 管理HRegionServer的负载均衡,调整Region分布;
  • Region Split后,负责新Region的分布;
  • 在HRegionServer停机后,负责失效HRegionServer上Region 的迁移;

HRegionServer
HBase中最核心的模块;

  • 维护region,处理对这些region的IO请求;
  • Regionserver负责切分在运行过程中变得过大的region;

HRegion

  • HBase使用RowKey将表水平切割成多个HRegion,从HMaster的角度,每个HRegion都纪录了它的StartKey和EndKey(第一个HRegion的StartKey为空,最后一个HRegion的EndKey为空),由于RowKey是排序的,因而Client可以通过HMaster快速的定位每个RowKey在哪个HRegion中。

2.2查看hbase的web界面

http://master:16010/

3.hbase shell操作

3.1 DDL操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#开启hbase shell
hbase shell
#查看hbase状态
status
#查看hbase版本
version
#创建命名空间
create_namespace '命名空间名'
#显示所有命名空间
list_namespace
#删除命名空间, 在删除一个命名空间时,该命名空间不能包含任何的表,否则会报错
drop_namespace '命名空间名'
#创建默认命名空间的表
create '表名称', '列族名称1','列族名称2','列族名称N'
#创建带有命名空间的表
create '命名空间:表名称', '列族名称1','列族名称2','列族名称N'
#列出所有表
list
#获得表的描述
describe '表名'
#删除table 表的 列族名称1 列族
alter 'table',{NAME=>'列族名称1',METHOD=>'delete'}
#删除多个列族
alter 'table', {NAME => '列族名称1', METHOD => 'delete'},{NAME => '列族名称2', METHOD => 'delete'}
#先把表下线
disable '表名'
#再drop表
drop '表名'

3.2 DML操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#添加数据
# 语法:put <table>,<rowkey>,<family:column>,<value>,[<timestamp>]
#如果不写timestamp,则系统默认
put 'table','id01', 'c_f1:name','111'

#获取数据
#get: 获取表中一行数据,不能扫描全表
# 语法:get <table>,<rowkey>,[<family:column>,....]
get 'table','id01'

#更新数据
#语法:重新put,put时会覆盖原来的数据
put 'table','id01', 'c_f1:name','222'

#scan扫描
# 语法:scan <table> ,{COLUMNS => [ <family:column>,.... ], LIMIT => num}
#扫描全表,大表操作不可取
scan 'table'
#获取表中前两行
scan 'table', {LIMIT => 2}
#扫描表中指定列族数据
scan 'table', {COLUMNS => 'c_f1'}
#扫描表中执行列族中列的数据
scan 'table', {COLUMNS => 'c_f2:cert_no'}
#扫描表中值=222 的数据
scan 'table', FILTER=>"ValueFilter(=,'name:222')"
# 筛选行,按照rowkey的范围[STARTROW,STOPROW)
scan 'table', {STARTROW =>'id01' , STOPROW => 'id03'}

#删除行中某列数据
# 语法:delete <table>, <rowkey>, <family:column>
# 必须指定列名
# 会删除执行列的所有版本数据
delete 'table', 'id04', 'c_f2:name'

#删除整行
# 语法:deleteall <table>, <rowkey>
deleteall 'table', 'id05'

#清空表数据
# 语法: truncate <table>
truncate 'table'

#查询表中有多少行
# 语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}
# INTERVAL设置多少行显示一次及对应的rowkey,默认1000;
# CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度
#查询表中数据行数
count 'table'
#按照2行显示一次,查询
count 'table', {INTERVAL => 2}

4.hbase整合springboot

4.1pom

1
2
3
4
5
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.6</version>
</dependency>

4.2 application.properties

1
hbase.conf.confMaps.'hbase.zookeeper.quorum'=master,slave1,slave2

4.3 HbaseConfig 自定义配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
* Hbase-Conf配置
*
* @Author: suyuan
*/
@Configuration
@ConfigurationProperties(prefix = HbaseConfig.CONF_PREFIX)
public class HbaseConfig {

public static final String CONF_PREFIX = "hbase.conf";

private Map<String,String> confMaps;

public Map<String, String> getconfMaps() {
return confMaps;
}
public void setconfMaps(Map<String, String> confMaps) {
this.confMaps = confMaps;
}
}

4.4 HBaseUtils工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
* Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean
*/
@Component
public class SpringContextHolder implements ApplicationContextAware {

private static ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextHolder.applicationContext = applicationContext;
}

public static ApplicationContext getApplicationContext() {
assertApplicationContext();
return applicationContext;
}

@SuppressWarnings("unchecked")
public static <T> T getBean(String beanName) {
assertApplicationContext();
return (T) applicationContext.getBean(beanName);
}

public static <T> T getBean(Class<T> requiredType) {
assertApplicationContext();
return applicationContext.getBean(requiredType);
}

private static void assertApplicationContext() {
if (SpringContextHolder.applicationContext == null) {
throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
}
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@DependsOn("springContextHolder") //控制依赖顺序,保证springContextHolder类在之前已经加载
@Component
public class HBaseUtils {

private Logger logger = LoggerFactory.getLogger(this.getClass());

//手动获取hbaseConfig配置类对象
private static HbaseConfig hbaseConfig = SpringContextHolder.getBean("hbaseConfig");

private static Configuration conf = HBaseConfiguration.create();
private static ExecutorService pool = Executors.newScheduledThreadPool(20); //设置连接池
private static Connection connection = null;
private static HBaseUtils instance = null;
private static Admin admin = null;

private HBaseUtils(){
if(connection == null){
try {
//将hbase配置类中定义的配置加载到连接池中每个连接里
Map<String, String> confMap = hbaseConfig.getconfMaps();
for (Map.Entry<String,String> confEntry : confMap.entrySet()) {
conf.set(confEntry.getKey(), confEntry.getValue());
}
connection = ConnectionFactory.createConnection(conf, pool);
admin = connection.getAdmin();
} catch (IOException e) {
logger.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e);
}
}
}

//简单单例方法,如果autowired自动注入就不需要此方法
public static synchronized HBaseUtils getInstance(){
if(instance == null){
instance = new HBaseUtils();
}
return instance;
}


/**
* 创建表
*
* @param tableName 表名
* @param columnFamily 列族(数组)
*/
public void createTable(String tableName, String[] columnFamily) throws IOException{
TableName name = TableName.valueOf(tableName);
//如果存在则删除
if (admin.tableExists(name)) {
admin.disableTable(name);
admin.deleteTable(name);
logger.error("create htable error! this table {} already exists!", name);
} else {
HTableDescriptor desc = new HTableDescriptor(name);
for (String cf : columnFamily) {
desc.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(desc);
}
}

/**
* 插入记录(单行单列族-多列多值)
*
* @param tableName 表名
* @param row 行名
* @param columnFamilys 列族名
* @param columns 列名(数组)
* @param values 值(数组)(且需要和列一一对应)
*/
public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
Put put = new Put(Bytes.toBytes(row));
for (int i = 0; i < columns.length; i++) {
put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
table.put(put);
}
}

/**
* 插入记录(单行单列族-单列单值)
*
* @param tableName 表名
* @param row 行名
* @param columnFamily 列族名
* @param column 列名
* @param value 值
*/
public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException {
TableName name = TableName.valueOf(tableName);
Table table = connection.getTable(name);
Put put = new Put(Bytes.toBytes(row));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
}

/**
* 删除一行记录
*
* @param tablename 表名
* @param rowkey 行名
*/
public void deleteRow(String tablename, String rowkey) throws IOException {
TableName name = TableName.valueOf(tablename);
Table table = connection.getTable(name);
Delete d = new Delete(rowkey.getBytes());
table.delete(d);
}

/**
* 删除单行单列族记录
* @param tablename 表名
* @param rowkey 行名
* @param columnFamily 列族名
*/
public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException {
TableName name = TableName.valueOf(tablename);
Table table = connection.getTable(name);
Delete d = new Delete(rowkey.getBytes()).deleteFamily(Bytes.toBytes(columnFamily));
table.delete(d);
}

/**
* 删除单行单列族单列记录
*
* @param tablename 表名
* @param rowkey 行名
* @param columnFamily 列族名
* @param column 列名
*/
public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException {
TableName name = TableName.valueOf(tablename);
Table table = connection.getTable(name);
Delete d = new Delete(rowkey.getBytes()).deleteColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
table.delete(d);
}


/**
* 查找一行记录
*
* @param tablename 表名
* @param rowKey 行名
*/
public static String selectRow(String tablename, String rowKey) throws IOException {
String record = "";
TableName name=TableName.valueOf(tablename);
Table table = connection.getTable(name);
Get g = new Get(rowKey.getBytes());
Result rs = table.get(g);
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = rs.getMap();
for (Cell cell : rs.rawCells()) {
StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("\t")
.append(Bytes.toString(cell.getFamily())).append("\t")
.append(Bytes.toString(cell.getQualifier())).append("\t")
.append(Bytes.toString(cell.getValue())).append("\n");
String str = stringBuffer.toString();
record += str;
}
return record;
}

/**
* 查找单行单列族单列记录
*
* @param tablename 表名
* @param rowKey 行名
* @param columnFamily 列族名
* @param column 列名
* @return
*/
public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException {
TableName name=TableName.valueOf(tablename);
Table table = connection.getTable(name);
Get g = new Get(rowKey.getBytes());
g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
Result rs = table.get(g);
return Bytes.toString(rs.value());
}

/**
* 查询表中所有行(Scan方式)
*
* @param tablename
* @return
*/
public String scanAllRecord(String tablename) throws IOException {
String record = "";
TableName name=TableName.valueOf(tablename);
Table table = connection.getTable(name);
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
try {
for(Result result : scanner){
for (Cell cell : result.rawCells()) {
StringBuffer stringBuffer = new StringBuffer().append(Bytes.toString(cell.getRow())).append("\t")
.append(Bytes.toString(cell.getFamily())).append("\t")
.append(Bytes.toString(cell.getQualifier())).append("\t")
.append(Bytes.toString(cell.getValue())).append("\n");
String str = stringBuffer.toString();
record += str;
}
}
} finally {
if (scanner != null) {
scanner.close();
}
}

return record;
}

/**
* 根据rowkey关键字查询报告记录
*
* @param tablename
* @param rowKeyword
* @return
*/
public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException {
ArrayList<Object> list = new ArrayList<>();

Table table = connection.getTable(TableName.valueOf(tablename));
Scan scan = new Scan();

//添加行键过滤器,根据关键字匹配
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
scan.setFilter(rowFilter);

ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
//TODO 此处根据业务来自定义实现
list.add(null);
}
} finally {
if (scanner != null) {
scanner.close();
}
}

return list;
}

/**
* 根据rowkey关键字和时间戳范围查询报告记录
*
* @param tablename
* @param rowKeyword
* @return
*/
public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException {
ArrayList<Object> list = new ArrayList<>();

Table table = connection.getTable(TableName.valueOf(tablename));
Scan scan = new Scan();
//添加scan的时间范围
scan.setTimeRange(minStamp, maxStamp);

RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword));
scan.setFilter(rowFilter);

ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
//TODO 此处根据业务来自定义实现
list.add(null);
}
} finally {
if (scanner != null) {
scanner.close();
}
}

return list;
}


/**
* 删除表操作
*
* @param tablename
*/
public void deleteTable(String tablename) throws IOException {
TableName name=TableName.valueOf(tablename);
if(admin.tableExists(name)) {
admin.disableTable(name);
admin.deleteTable(name);
}
}

/**
* 利用协处理器进行全表count统计
*
* @param tablename
*/
public Long countRowsWithCoprocessor(String tablename) throws Throwable {
TableName name=TableName.valueOf(tablename);
HTableDescriptor descriptor = admin.getTableDescriptor(name);

String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
if (! descriptor.hasCoprocessor(coprocessorClass)) {
admin.disableTable(name);
descriptor.addCoprocessor(coprocessorClass);
admin.modifyTable(name, descriptor);
admin.enableTable(name);
}

//计时
StopWatch stopWatch = new StopWatch();
stopWatch.start();

Scan scan = new Scan();
AggregationClient aggregationClient = new AggregationClient(conf);

Long count = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);

stopWatch.stop();
System.out.println("RowCount:" + count + ",全表count统计耗时:" + stopWatch.getTotalTimeMillis());

return count;
}

}


4.5 测试用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/hbase")
public class HbaseController {

@Autowired
private HBaseUtils hBaseUtils;

/**
* 创建表
* @author suyuan
* @date 2021/6/22 17:17
*/
@RequestMapping("/createTable")
public String createTable() throws Exception {
String[] arr = {"name","age"};
hBaseUtils.createTable("sy:syy",arr);
return "ok";
}

/**
* 删除表
* @author suyuan
* @date 2021/6/22 17:18
*/
@RequestMapping("/deleteTable")
public String deleteTable() throws Exception {
hBaseUtils.deleteTable("syy");
return "ok";
}

/**
* 插入记录(插入同列族数据就是更新)
* @author suyuan
* @date 2021/6/22 17:18
*/
@RequestMapping("/insertRecords")
public String insertRecords() throws Exception {
hBaseUtils.insertRecords("sy:syy","20210622","name", new String[]{"name1","name2"}, new String[]{"111","222"});
return "ok";
}

/**
* 查找一行记录
* @author suyuan
* @date 2021/6/22 17:18
*/
@RequestMapping("/selectRow")
public String selectRow() throws Exception {
String s = hBaseUtils.selectRow("sy:syy", "20210622");
System.out.println(s);
return s;
}

}

本地编写代码需配置:

host地址

1
2
3
192.168.10.160  master
192.168.10.161 slave1
192.168.10.162 slave2

集群需要启动

zookeeper、hadoop、hbase

上一篇:
01redis和zk锁的实现原理
下一篇:
mybatis-plus总结
本文目录
本文目录