Hadoop框架:MapReduce基本原理和入门案例

Stella981
• 阅读 574

本文源码:GitHub·点这里 || GitEE·点这里

一、MapReduce概述

1、基本概念

Hadoop核心组件之一:分布式计算的方案MapReduce,是一种编程模型,用于大规模数据集的并行运算,其中Map(映射)和Reduce(归约)。

MapReduce既是一个编程模型,也是一个计算组件,处理的过程分为两个阶段,Map阶段:负责把任务分解为多个小任务,Reduce负责把多个小任务的处理结果进行汇总。其中Map阶段主要输入是一对Key-Value,经过map计算后输出一对Key-Value值;然后将相同Key合并,形成Key-Value集合;再将这个Key-Value集合转入Reduce阶段,经过计算输出最终Key-Value结果集。

2、特点描述

MapReduce可以实现基于上千台服务器并发工作,提供很强大的数据处理能力,如果其中单台服务挂掉,计算任务会自动转义到另外节点执行,保证高容错性;但是MapReduce不适应于实时计算与流式计算,计算的数据是静态的。

二、操作案例

1、流程描述

Hadoop框架:MapReduce基本原理和入门案例

数据文件一般以CSV格式居多,数据行通常以空格分隔,这里需要考虑数据内容特点;

文件经过切片分配在不同的MapTask任务中并发执行;

MapTask任务执行完毕之后,执行ReduceTask任务,依赖Map阶段的数据;

ReduceTask任务执行完毕后,输出文件结果。

2、基础配置

hadoop:
  # 读取的文件源
  inputPath: hdfs://hop01:9000/hopdir/javaNew.txt
  # 该路径必须是程序运行前不存在的
  outputPath: /wordOut

3、Mapper程序

public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text mapKey = new Text();
    IntWritable mapValue = new IntWritable(1);

    @Override
    protected void map (LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
        // 1、读取行
        String line = value.toString();
        // 2、行内容切割,根据文件中分隔符
        String[] words = line.split(" ");
        // 3、存储
        for (String word : words) {
            mapKey.set(word);
            context.write(mapKey, mapValue);
        }
    }
}

4、Reducer程序

public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    int sum ;
    IntWritable value = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                        throws IOException, InterruptedException {
        // 1、累加求和统计
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }
        // 2、输出结果
        value.set(sum);
        context.write(key,value);
    }
}

5、执行程序

@RestController
public class WordWeb {

    @Resource
    private MapReduceConfig mapReduceConfig ;

    @GetMapping("/getWord")
    public String getWord () throws IOException, ClassNotFoundException, InterruptedException {
        // 声明配置
        Configuration hadoopConfig = new Configuration();
        hadoopConfig.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        hadoopConfig.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem.class.getName()
        );
        Job job = Job.getInstance(hadoopConfig);

        // Job执行作业 输入路径
        FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath()));
        // Job执行作业 输出路径
        FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath()));

        // 自定义 Mapper和Reducer 两个阶段的任务处理类
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(WordReducer.class);

        // 设置输出结果的Key和Value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //执行Job直到完成
        job.waitForCompletion(true);
        return "success" ;
    }
}

6、执行结果查看

将应用程序打包放到hop01服务上执行;

java -jar map-reduce-case01.jar

Hadoop框架:MapReduce基本原理和入门案例

三、案例分析

1、数据类型

Java数据类型与对应的Hadoop数据序列化类型;

Java类型

Writable类型

Java类型

Writable类型

String

Text

float

FloatWritable

int

IntWritable

long

LongWritable

boolean

BooleanWritable

double

DoubleWritable

byte

ByteWritable

array

DoubleWritable

map

MapWritable

2、核心模块

Mapper模块:处理输入的数据,业务逻辑在map()方法中完成,输出的数据也是KV格式;

Reducer模块:处理Map程序输出的KV数据,业务逻辑在reduce()方法中;

Driver模块:将程序提交到yarn进行调度,提交封装了运行参数的job对象;

四、序列化操作

1、序列化简介

序列化:将内存中对象转换为二进制的字节序列,可以通过输出流持久化存储或者网络传输;

反序列化:接收输入字节流或者读取磁盘持久化的数据,加载到内存的对象过程;

Hadoop序列化相关接口:Writable实现的序列化机制、Comparable管理Key的排序问题;

2、案例实现

案例描述:读取文件,并对文件相同的行做数据累加计算,输出计算结果;该案例演示在本地执行,不把Jar包上传的hadoop服务器,驱动配置一致。

实体对象属性

public class AddEntity implements Writable {

    private long addNum01;
    private long addNum02;
    private long resNum;

    // 构造方法
    public AddEntity() {
        super();
    }
    public AddEntity(long addNum01, long addNum02) {
        super();
        this.addNum01 = addNum01;
        this.addNum02 = addNum02;
        this.resNum = addNum01 + addNum02;
    }

    // 序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(addNum01);
        dataOutput.writeLong(addNum02);
        dataOutput.writeLong(resNum);
    }
    // 反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        // 注意:反序列化顺序和写序列化顺序一致
        this.addNum01  = dataInput.readLong();
        this.addNum02 = dataInput.readLong();
        this.resNum = dataInput.readLong();
    }
    // 省略Get和Set方法
}

Mapper机制

public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> {

    Text myKey = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // 读取行
        String line = value.toString();

        // 行内容切割
        String[] lineArr = line.split(",");

        // 内容格式处理
        String lineNum = lineArr[0];
        long addNum01 = Long.parseLong(lineArr[1]);
        long addNum02 = Long.parseLong(lineArr[2]);

        myKey.set(lineNum);
        AddEntity myValue = new AddEntity(addNum01,addNum02);

        // 输出
        context.write(myKey, myValue);
    }
}

Reducer机制

public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> {

    @Override
    protected void reduce(Text key, Iterable<AddEntity> values, Context context)
            throws IOException, InterruptedException {

        long addNum01Sum = 0;
        long addNum02Sum = 0;

        // 处理Key相同
        for (AddEntity addEntity : values) {
            addNum01Sum += addEntity.getAddNum01();
            addNum02Sum += addEntity.getAddNum02();
        }

        // 最终输出
        AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum);
        context.write(key, addRes);
    }
}

案例最终结果:

Hadoop框架:MapReduce基本原理和入门案例

五、源代码地址

GitHub·地址
https://github.com/cicadasmile/big-data-parent
GitEE·地址
https://gitee.com/cicadasmile/big-data-parent

Hadoop框架:MapReduce基本原理和入门案例

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这