MapReduce

Stella981
• 阅读 707

MapReduce-从HBase读取处理后再写入HBase

代码如下

package com.hbase.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @author:FengZhen
* @create:2018年9月17日
* 从HBase读写入HBase
* zip -d HBaseToHBase.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF'
*/
public class HBaseToHBase extends Configured implements Tool{

    private static String addr="HDP233,HDP232,HDP231";
    private static String port="2181";
    
    public enum Counters { ROWS, COLS, VALID, ERROR, EMPTY, NOT_EMPTY}
    
    static class ParseMapper extends TableMapper<ImmutableBytesWritable, Put>{
        private byte[] columnFamily = null;
        @Override
        protected void setup(Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
                throws IOException, InterruptedException {
            columnFamily = Bytes.toBytes(context.getConfiguration().get("conf.columnfamily"));
        }
        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
                throws IOException, InterruptedException {
            context.getCounter(Counters.ROWS).increment(1);
            String hbaseValue = null;
            
            Put put = new Put(key.get());
            for (Cell cell : value.listCells()) {
                context.getCounter(Counters.COLS).increment(1);
                hbaseValue = Bytes.toString(CellUtil.cloneValue(cell));
                if (hbaseValue.length() > 0) {
                    String top = hbaseValue.substring(0, hbaseValue.length()/2);
                    String detail = hbaseValue.substring(hbaseValue.length()/2, hbaseValue.length() - 1);
                    put.addColumn(columnFamily, Bytes.toBytes("top"), Bytes.toBytes(top));
                    put.addColumn(columnFamily, Bytes.toBytes("detail"), Bytes.toBytes(detail));
                    context.getCounter(Counters.NOT_EMPTY).increment(1);
                }else {
                    put.addColumn(columnFamily, Bytes.toBytes("empty"), Bytes.toBytes(hbaseValue));
                    context.getCounter(Counters.EMPTY).increment(1);
                }
            }
            try {
                context.write(key, put);
                context.getCounter(Counters.VALID).increment(1);
            } catch (Exception e) {
                e.printStackTrace();
                context.getCounter(Counters.ERROR).increment(1);
            }
        }
    }
    
    static class ParseTableReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
                Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Mutation>.Context context)
                throws IOException, InterruptedException {
            for (Put put : values) {
                context.write(key, put);
            }
        }
    }
    
    public int run(String[] arg0) throws Exception {
        String table = arg0[0];
        String column = arg0[1];
        String destTable = arg0[2];
        
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum",addr);
        configuration.set("hbase.zookeeper.property.clientPort", port);
        
        Scan scan = new Scan();
        if (null != column) {
            byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
            if (colkey.length > 1) {
                scan.addColumn(colkey[0], colkey[1]);
                configuration.set("conf.columnfamily", Bytes.toString(colkey[0]));
                configuration.set("conf.columnqualifier", Bytes.toString(colkey[1]));
            }else {
                scan.addFamily(colkey[0]);
                configuration.set("conf.columnfamily", Bytes.toString(colkey[0]));
            }
        }
        
        Job job = Job.getInstance(configuration);
        job.setJobName("HBaseToHBase2");
        job.setJarByClass(HBaseToHBase2.class);
        
        job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, destTable);
        
        job.setMapperClass(ParseMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        
//        job.setReducerClass(ParseTableReducer.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        
        job.setInputFormatClass(TableInputFormat.class);
        TableInputFormat.addColumns(scan, KeyValue.parseColumn(Bytes.toBytes(column)));
        job.setOutputFormatClass(TableOutputFormat.class);
        
        job.setNumReduceTasks(0);
        
        //使用TableMapReduceUtil会报类找不到错误
        //Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry
//        TableMapReduceUtil.initTableMapperJob(table, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class, job);
//        TableMapReduceUtil.initTableReducerJob(table, IdentityTableReducer.class, job);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        String[] params = new String[] {"test_table_mr", "data:info", "test_table_dest"};
        int exitCode = ToolRunner.run(new HBaseToHBase2(), params);
        System.exit(exitCode);
    }
}

 打包测试

zip -d HBaseToHBase.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF'
hadoop jar HBaseToHBase.jar com.hbase.mapreduce.HBaseToHBase

出现的问题

一开始使用额TableMapReduceUtil,但是报下面这个错

Exception in thread "main" java.lang.NoClassDefFoundError: com/yammer/metrics/core/MetricsRegistry
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(TableMapReduceUtil.java:732)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(TableMapReduceUtil.java:777)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:212)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:168)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:291)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:92)
    at com.hbase.mapreduce.HBaseToHBase.run(HBaseToHBase.java:108)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
    at com.hbase.mapreduce.HBaseToHBase.main(HBaseToHBase.java:115)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:233)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.MetricsRegistry
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 16 more

 解决,不使用TableMapReduceUtil,分布设置便可解决此问题

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这