MAPREDUCER学习笔记

Wesley13
• 阅读 618

MAPREDUCE基本原理          


一,概念理解

  1,Mapreduce是一个分布式运算程序的编程架构,相对于HDFS来说就是客户端。其核心功能就是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并运行在一个hadoop集群上。

  2,基本整体架构:MEAppMaster,MapTask,ReduceTask。

二,MapReduce框架结构及核心运行机制

   1,结构:MRAppMaster:负责整个程序的过程调度及状态协调;MapTask:负责map阶段的整个数据处理流程---》数据处理;ReduceTask:负责reduce阶段的整个数据处理流程---》数据运算。

   2,MR程序运行的流程(以wordcount为例子)

MAPREDUCER学习笔记

   一个mr程序启动之后,首先执行的是MRAppMatser,启动之后,根据job的描述,计算出所需的MapTask的实例个数,然后向集群申请所需的MapTask的个数。

  MapTask启动后根据给定的切片范围进行数据处理。

  MRAppMatser监控到所有的MapTask执行完毕后,会根据客户指定的参数启动相应的数量的ReduceTask进程,并告知ReduceTask进程要处理的数据范围。

  执行ReducerTask,将运算结果通过用户指定的outputformat将结果输出到外部存储。

三,MapReduce程序的运行模式

  1,本地运行模式:而处理的数据及输出结果可以在本地文件系统。

  2,集群运行模式:将mapreduce程序提交给yarn集群resourcemanager,分发到很多的节点上并发执行处理的数据和输出结果应该位于hdfs文件系统。

四,wordcount示例编写

需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数

1,定义一个mapper类

package com;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*KEYIN指的是输入的Key
 * VALUEUIN指的是输入的value
 * KEYOUT输出的key值
 * VALUEOUT输出的value值
*/
public class wordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    //处理MRMapperMaster分配过来的数据,我们实现业务
    /*LongWritable  key  读取文件内容的偏移量
     * Text value  文本neirong
     * Context Context Mapperduce的上下文 
     * 
     * */
    //从写map方法
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        //每一行数据
        String data =value.toString();
        //获取每行单词
        String[] split =data.split(" ");
        //遍历数组
        for (String string : split) {
            //读取的每个单词都设置为key为单词,value为1
            context.write(new Text(string), new IntWritable(1));
        }
    }
}

  2,定义一个reducer类

package com;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
 * reduce的KEYIN指的是输入的key
 * Reduce的ValueIn 指的是输入的value
 * Redude的keyout指的是 输出的key
 * Reduce 的valueout 指的是 输出的value值
 * 
 * 
 * */
public class wordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
     //重写reduce方法
    @Override
    protected void reduce(Text keyin, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        //统计出现单词的次数
        int count =0;
        //循环一共出现的次数并累加
        Iterator<IntWritable>iter =values.iterator();
        while(iter.hasNext()){
            IntWritable next =iter.next();
            count+=next.get();
        }
        context.write(keyin,new IntWritable(count));
    }  
}

  3,定义一个主类,用来描述job并提交job

package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
public class wordCountSubmit {
    public static void main(String[] args) throws Exception {
        Configuration conf=new JobConf();
        conf.set("fs.defaultFS", "hdfs://192.168.184.134:9000");
      
        Job wcj=Job.getInstance(conf);
        //设置jar包所需要执行的class类
        wcj.setJarByClass(wordCountSubmit.class);
        //设置reduce所在的class类
        wcj.setReducerClass(wordCountReducer.class);
        //设置mapper所在的class类
        wcj.setMapperClass(wordCountMapper.class);
        
        //设置mapper的输出数据类型
        wcj.setMapOutputKeyClass(Text.class);
        wcj.setMapOutputValueClass(IntWritable.class);
        //设置reduce的输出数据类型
        wcj.setOutputKeyClass(Text.class);
        wcj.setOutputValueClass(IntWritable.class);
        
        //设置文件的输入输出位置
        FileInputFormat.setInputPaths(wcj, "/sum");
        FileOutputFormat.setOutputPath(wcj, new Path("/sum2"));//此路径下的sum2在hdfs中不能存在否则报错
        //打包的jar包在虚拟机的home目录下就可以运行,不要在Hadoop的hdfs文件管理系统下运行
        //提交
        wcj.waitForCompletion(true);
    }
}

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
MapReduce编程模型和计算框架架构原理
Hadoop解决大规模数据分布式计算的方案是MapReduce。MapReduce既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于MapReduce编程模型进行编程开发,然后将程序通过MapReduce计算框架分发到Hadoop集群中运行。我们先看一下作为编程模型的MapReduce。MapReduce编程模型
Stella981 Stella981
3年前
MapReduce 基本原理(MP用于分布式计算)
hadoop最主要的2个基本的内容要了解。上次了解了一下HDFS,本章节主要是了解了MapReduce的一些基本原理。MapReduce文件系统:它是一种编程模型,用于大规模数据集(大于1TB)的并行运算。MapReduce将分为两个部分:Map(映射)和Reduce(归约)。当你向mapreduce框架提交一个计算作业,它会首先把计算作业分成若干个
Stella981 Stella981
3年前
Hadoop之Mapreduce详解
1、什么是Mapreduce   Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;2、Mapreduce框架结构及核心运行机制
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Stella981 Stella981
3年前
Hadoop技术原理总结
Hadoop技术原理总结1、Hadoop运行原理Hadoop是一个开源的可运行于大规模集群上的分布式并行编程框架,其最核心的设计包括:MapReduce和HDFS。基于Hadoop,你可以轻松地编写可处理海量数据的分布式并行程序,并将其运行于由成百上千个结点组成的大规模计算机集群上。基于MapReduce计算模型编写分布式并行程序相对简单,