Hbase调用JavaAPI实现批量导入操作

Stella981
• 阅读 1135

将手机上网日志文件批量导入到Hbase中。操作步骤:

1、将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop  fs -put input  /

Hbase调用JavaAPI实现批量导入操作

2、创建Hbase表,通过Java操作

Java代码   Hbase调用JavaAPI实现批量导入操作

  1. package com.jiewen.hbase;

  2. import java.io.IOException;

  3. import org.apache.hadoop.conf.Configuration;

  4. import org.apache.hadoop.hbase.HBaseConfiguration;

  5. import org.apache.hadoop.hbase.HColumnDescriptor;

  6. import org.apache.hadoop.hbase.HTableDescriptor;

  7. import org.apache.hadoop.hbase.client.Get;

  8. import org.apache.hadoop.hbase.client.HBaseAdmin;

  9. import org.apache.hadoop.hbase.client.HTable;

  10. import org.apache.hadoop.hbase.client.Put;

  11. import org.apache.hadoop.hbase.client.Result;

  12. import org.apache.hadoop.hbase.client.ResultScanner;

  13. import org.apache.hadoop.hbase.client.Scan;

  14. import org.apache.hadoop.hbase.util.Bytes;

  15. public class HbaseDemo {

  16. public static void main(String[] args) throws IOException {

  17. String tableName = "wlan_log";

  18. String columnFamily = "cf";

  19. HbaseDemo.create(tableName, columnFamily);

  20. // HbaseDemo.put(tableName, "row1", columnFamily, "cl1", "data");

  21. // HbaseDemo.get(tableName, "row1");

  22. // HbaseDemo.scan(tableName);

  23. // HbaseDemo.delete(tableName);

  24. }

  25. // hbase操作必备

  26. private static Configuration getConfiguration() {

  27. Configuration conf = HBaseConfiguration.create();

  28. conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase");

  29. // 使用eclipse时必须加入这个,否则无法定位

  30. conf.set("hbase.zookeeper.quorum", "hadoop1");

  31. return conf;

  32. }

  33. // 创建一张表

  34. public static void create(String tableName, String columnFamily)

  35. throws IOException {

  36. HBaseAdmin admin = new HBaseAdmin(getConfiguration());

  37. if (admin.tableExists(tableName)) {

  38. System.out.println("table exists!");

  39. } else {

  40. HTableDescriptor tableDesc = new HTableDescriptor(tableName);

  41. tableDesc.addFamily(new HColumnDescriptor(columnFamily));

  42. admin.createTable(tableDesc);

  43. System.out.println("create table success!");

  44. }

  45. }

  46. // 加入一条记录

  47. public static void put(String tableName, String row, String columnFamily,

  48. String column, String data) throws IOException {

  49. HTable table = new HTable(getConfiguration(), tableName);

  50. Put p1 = new Put(Bytes.toBytes(row));

  51. p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes

  52. .toBytes(data));

  53. table.put(p1);

  54. System.out.println("put'" + row + "'," + columnFamily + ":" + column

  55. + "','" + data + "'");

  56. }

  57. // 读取一条记录

  58. public static void get(String tableName, String row) throws IOException {

  59. HTable table = new HTable(getConfiguration(), tableName);

  60. Get get = new Get(Bytes.toBytes(row));

  61. Result result = table.get(get);

  62. System.out.println("Get: " + result);

  63. }

  64. // 显示全部数据

  65. public static void scan(String tableName) throws IOException {

  66. HTable table = new HTable(getConfiguration(), tableName);

  67. Scan scan = new Scan();

  68. ResultScanner scanner = table.getScanner(scan);

  69. for (Result result : scanner) {

  70. System.out.println("Scan: " + result);

  71. }

  72. }

  73. // 删除表

  74. public static void delete(String tableName) throws IOException {

  75. HBaseAdmin admin = new HBaseAdmin(getConfiguration());

  76. if (admin.tableExists(tableName)) {

  77. try {

  78. admin.disableTable(tableName);

  79. admin.deleteTable(tableName);

  80. } catch (IOException e) {

  81. e.printStackTrace();

  82. System.out.println("Delete " + tableName + " 失败");

  83. }

  84. }

  85. System.out.println("Delete " + tableName + " 成功");

  86. }

  87. }

3、将日志文件导入Hbase表wlan_log中:

Java代码   Hbase调用JavaAPI实现批量导入操作

  1. import java.text.SimpleDateFormat;

  2. import java.util.Date;

  3. import org.apache.hadoop.conf.Configuration;

  4. import org.apache.hadoop.hbase.client.Put;

  5. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

  6. import org.apache.hadoop.hbase.mapreduce.TableReducer;

  7. import org.apache.hadoop.hbase.util.Bytes;

  8. import org.apache.hadoop.io.LongWritable;

  9. import org.apache.hadoop.io.NullWritable;

  10. import org.apache.hadoop.io.Text;

  11. import org.apache.hadoop.mapreduce.Counter;

  12. import org.apache.hadoop.mapreduce.Job;

  13. import org.apache.hadoop.mapreduce.Mapper;

  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

  16. public class HbaseBatchImport {

  17. public static void main(String[] args) throws Exception {

  18. final Configuration configuration = new Configuration();

  19. // 设置zookeeper

  20. configuration.set("hbase.zookeeper.quorum", "hadoop1");

  21. // 设置hbase表名称

  22. configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");

  23. // 将该值改大,防止hbase超时退出

  24. configuration.set("dfs.socket.timeout", "180000");

  25. final Job job = new Job(configuration, "HBaseBatchImport");

  26. job.setMapperClass(BatchImportMapper.class);

  27. job.setReducerClass(BatchImportReducer.class);

  28. // 设置map的输出,不设置reduce的输出类型

  29. job.setMapOutputKeyClass(LongWritable.class);

  30. job.setMapOutputValueClass(Text.class);

  31. job.setInputFormatClass(TextInputFormat.class);

  32. // 不再设置输出路径。而是设置输出格式类型

  33. job.setOutputFormatClass(TableOutputFormat.class);

  34. FileInputFormat.setInputPaths(job, "hdfs://hadoop1:9000/input");

  35. job.waitForCompletion(true);

  36. }

  37. static class BatchImportMapper extends

  38. Mapper<LongWritable, Text, LongWritable, Text> {

  39. SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");

  40. Text v2 = new Text();

  41. protected void map(LongWritable key, Text value, Context context)

  42. throws java.io.IOException, InterruptedException {

  43. final String[] splited = value.toString().split("\t");

  44. try {

  45. final Date date = new Date(Long.parseLong(splited[0].trim()));

  46. final String dateFormat = dateformat1.format(date);

  47. String rowKey = splited[1] + ":" + dateFormat;

  48. v2.set(rowKey + "\t" + value.toString());

  49. context.write(key, v2);

  50. } catch (NumberFormatException e) {

  51. final Counter counter = context.getCounter("BatchImport",

  52. "ErrorFormat");

  53. counter.increment(1L);

  54. System.out.println("出错了" + splited[0] + " " + e.getMessage());

  55. }

  56. };

  57. }

  58. static class BatchImportReducer extends

  59. TableReducer<LongWritable, Text, NullWritable> {

  60. protected void reduce(LongWritable key,

  61. java.lang.Iterable values, Context context)

  62. throws java.io.IOException, InterruptedException {

  63. for (Text text : values) {

  64. final String[] splited = text.toString().split("\t");

  65. final Put put = new Put(Bytes.toBytes(splited[0]));

  66. put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes

  67. .toBytes(splited[1]));

  68. // 省略其它字段,调用put.add(....)就可以

  69. context.write(NullWritable.get(), put);

  70. }

  71. };

  72. }

  73. }

4、查看导入结果:

Hbase调用JavaAPI实现批量导入操作

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
4cast
4castpackageloadcsv.KumarAwanish发布:2020122117:43:04.501348作者:KumarAwanish作者邮箱:awanish00@gmail.com首页:
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
HBase启动失败
如果在hbase的shell中输入了status报错,hbase(main):001:0statusERROR:org.apache.hadoop.hbase.ipc.ServerNotRunningYetException:Serverisnotrunningyetatorg.apache.ha
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这