HBase 架构
HBase是Hadoop的数据库,能够对大数据提供随机、实时读写访问。他是开源的,分布式的,多版本的,面向列的,存储模型。
在讲解的时候我首先给大家讲解一下HBase的整体结构,如下图
HBase Master是服务器负责管理所有的HRegion服务器,HBase Master并不存储HBase服务器的任何数据,HBase逻辑上的表可能会划分为多个HRegion,然后存储在HRegion Server群中,HBase Master Server中存储的是从数据到HRegion Server的映射。
一台机器只能运行一个HRegion服务器,数据的操作会记录在Hlog中,在读取数据时候,HRegion会先访问Hmemcache缓存,如果 缓存中没有数据才回到Hstore中上找,每一个列都会有一个Hstore集合,每个Hstore集合包含了很多具体的HstoreFile文件,这些文件是B树结构的,方便快速读取。
HBase数据物理视图如下:
Ø Timestamp: 时间戳,每次数据操作对应的时间戳,可以看作是数据的version number****Ø Row Key: 行键,Table的主键,Table中的记录按照Row Key排序
Ø Column Family:列簇,Table在水平方向有一个或者多个Column Family组成,一个Column Family中可以由任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,所有Column均以二进制格式存储,用户需要自行进行类型转换。
Java来操作HBase数据
如果hbase shell已经设置在环境变量中的话,可以直接输入以下命令进入
hbase shell
0)查看基本信息
version
status
whoami
表的管理
1)查看有哪些表
list
2)查结构 describe 'my_test'
查数据内容: scan 'my_test',{LIMIT => 5}
获取某个key下所有的列簇:列:
get 'test_USERPERHOURINFO_SPARK','20170228#20170228112348'
ps: "LIMIT" 大写
封装一个Java 操作HBase 1.1.7 的Api,代码如下:
/**
* Created by eric on 2016/11/24
* 基于 HBase 1.1.7版本 HBase集群 的 java api 封装
*/
package com.bw.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;
class HBaseUtil {
private static Configuration conf = null;
/**
* 初始化配置 静态初始化块
*/
static {
InputStream propInStream = null;
try {
propInStream = new FileInputStream(new File("./src/main/resources/config/config.properties"));
} catch (FileNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Properties prop = new Properties();
//load(InputStream inStream)方法从.properties属性文件对应的文件输入流中,加载属性列表到Properties类对象
try{
prop.load(propInStream);
}catch(IOException e){
System.err.println(e.getMessage());
}finally{
try {
propInStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//Hbase读取配置文件中的内容
//prop.getProperty方法是分别是获取属性信息。
Configuration HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort",prop.getProperty("hbase.zookeeper.property.clientPort"));
HBASE_CONFIG.set("hbase.zookeeper.quorum",prop.getProperty("hbase.zookeeper.quorum"));
HBASE_CONFIG.set("hbase.master.port",prop.getProperty("hbase.master.port"));
HBASE_CONFIG.set("zookeeper.znode.parent",prop.getProperty("hbase.zookeeper.znode.parent"));
conf = HBaseConfiguration.create(HBASE_CONFIG);
}
/**
* 创建数据库表
*/
public static void createTable(String tableName, String[] columnFamilys) throws IOException {
// 建立一个数据库的连接
Connection conn = ConnectionFactory.createConnection(conf);
// 创建一个数据库管理员
HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
try {
if (hAdmin.tableExists(tableName)) {
System.out.println(tableName + "表已存在");
conn.close();
System.exit(0);
} else {
// 新建一个表描述
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
// 在表描述里添加列族
for (String columnFamily : columnFamilys) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);//设置压缩类型
tableDesc.addFamily(hColumnDescriptor);
}
// 根据配置好的表描述建表
hAdmin.createTable(tableDesc);
System.out.println("创建" + tableName + "表成功");
}
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 插入或者更新单条数据
* @param tableName 表名
* @param rowKey 行键
* @param family 列簇
* @param qualifier 限定符(列键名)
* @param value 列键值
* @return true: 插入成功; false: 插入失败
* @throws IOException
*/
public static boolean saveData(String tableName, String rowKey, String family, String qualifier,
String value) throws IOException{
try {
Connection conn = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));//参数是 行键
put.addColumn(
Bytes.toBytes(family),
Bytes.toBytes(qualifier),
Bytes.toBytes(value)
);
hTable.put(put);
hTable.close();
conn.close();
return true;
} catch (IOException e){
e.printStackTrace();
}
return false;
}
/**
* 读取一个限定符的值
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @return
* @throws IOException
*/
public static String getValueByQualifier(String tableName, String rowKey, String family, String qualifier)
throws IOException{
try {
Connection conn = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result res = hTable.get(get);
hTable.close();
conn.close();
return Bytes.toString(CellUtil.cloneValue(res.listCells().get(0)));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 获取一个列簇的一个限定符的值
* @param tableName
* @param rowKey
* @param family
* @return
* @throws IOException
*/
public static Map<String, String> getValueByFamily (String tableName, String rowKey, String family)
throws IOException{
Map<String, String> result = null;
try {
Connection conn = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addFamily(Bytes.toBytes(family));
Result res = hTable.get(get);
List<Cell> cs = res.listCells();
result = cs.size() > 0 ? new HashMap<String, String>() : result;
for (Cell cell : cs) {
result.put(
Bytes.toString(CellUtil.cloneFamily(cell)),
Bytes.toString(CellUtil.cloneValue(cell))
);
}
hTable.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
/**
*获取一个列簇的所有限定符的值
* @param tableName
* @param rowKey
* @return
* @throws IOException
*/
public static Map<String, Map<String, String>> getValueByFamilyAll(String tableName, String rowKey)
throws IOException{
Map<String, Map<String, String>> results = null ;
try {
Connection conn = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
Result res = hTable.get(get);
List<Cell> cs = res.listCells();
results = cs.size() > 0 ? new HashMap<String, Map<String, String>> () : results;
for (Cell cell : cs) {
String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
if (results.get(familyName) == null)
{
results.put(familyName, new HashMap<String, String> ());
}
results.get(familyName).put(
Bytes.toString(CellUtil.cloneQualifier(cell)),
Bytes.toString(CellUtil.cloneValue(cell))
);
}
hTable.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
return results;
}
/**
* 删除一个限定符
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @return
*/
public static boolean del(String tableName, String rowKey, String family, String qualifier) {
try {
Connection conn = ConnectionFactory.createConnection(conf);
HTable hTable = (HTable) conn.getTable(TableName.valueOf(tableName));
Delete del = new Delete(Bytes.toBytes(rowKey));
if (qualifier != null) {
del.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
} else if (family != null) {
del.addFamily(Bytes.toBytes(family));
}
hTable.delete(del);
hTable.close();
conn.close();
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 删除一行
* @param tableName
* @param rowKey
* @return
*/
public static boolean delRowKey(String tableName, String rowKey) {
return del(tableName, rowKey, null, null);
}
/**
* 删除一行下的一个列簇
* @param tableName
* @param rowKey
* @param family
* @return
*/
public static boolean delFamily(String tableName, String rowKey, String family) {
return del(tableName, rowKey, family, null);
}
/**
* 通过rowkey获取一条数据
*/
public static void getRow(String tableName, String rowKey) throws IOException {
// 建立一个数据库的连接
Connection conn = ConnectionFactory.createConnection(conf);
// 获取表
HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
// 通过rowkey创建一个get对象
Get get = new Get(Bytes.toBytes(rowKey));
// 输出结果
Result result = table.get(get);
for (Cell cell : result.rawCells()) {
System.out.println(
"行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
"列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
"列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
"值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
"时间戳:" + cell.getTimestamp());
}
// 关闭资源
table.close();
conn.close();
}
/**
* 全表扫描
*/
public static void scanTable(String tableName) throws IOException {
// 建立一个数据库的连接
Connection conn = ConnectionFactory.createConnection(conf);
// 获取表
HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
// 创建一个扫描对象
Scan scan = new Scan();
// 扫描全表输出结果
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
for (Cell cell : result.rawCells()) {
System.out.println(
"行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
"列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
"列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
"值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
"时间戳:" + cell.getTimestamp());
}
}
// 关闭资源
results.close();
table.close();
conn.close();
}
/**
* 删除多条数据
*/
public static void delRows(String tableName, String[] rows) throws IOException {
// 建立一个数据库的连接
Connection conn = ConnectionFactory.createConnection(conf);
// 获取表
HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
// 删除多条数据
List<Delete> list = new ArrayList<Delete>();
for (String row : rows) {
Delete delete = new Delete(Bytes.toBytes(row));
list.add(delete);
}
table.delete(list);
// 关闭资源
table.close();
conn.close();
}
/**
* 删除数据库表
*/
public static boolean deleteTable(String tableName) throws IOException {
// 建立一个数据库的连接
Connection conn = ConnectionFactory.createConnection(conf);
// 创建一个数据库管理员
HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
if (hAdmin.tableExists(tableName)) {
// 失效表
hAdmin.disableTable(tableName);
// 删除表
hAdmin.deleteTable(tableName);
conn.close();
return true;
} else {
conn.close();
return false;
}
}
}