MapReduce概述
MapReduce优缺点
优点
- 良好的扩展性
 - 高容错性
 
缺点
- 不擅长实时计算
 - 不擅长流式计算
 
MapReduce核心编程思想

MapReduce进程
AppMaster:负责整个程序的过程调度及状态协调MapTask:负责Map阶段的整个数据处理流程ReduceTask:负责Reduce阶段的整个数据处理流程
WordCount案例实操
pom.xml添加依赖
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
src/main/resources目录下新建log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <Layout type="PatternLayout"
                    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
        </Appender>
    </Appenders>
    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>
        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>
</Configuration>
编写Mapper类
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割
        String[] words = line.split(" ");
        // 3 输出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}
编写Reducer类
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        // 1 累加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }
        // 2 输出
         v.set(sum);
        context.write(key,v);
    }
}
编写Driver类
public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2 设置jar加载路径
        job.setJarByClass(WordCountDriver.class);
        // 3 设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);
        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
打包到hadoop中执行
hadoop jar jar包名 全类名 输入路径 输出路径
#输入路径为一个目录
#输出路径不能存在
Hadoop序列化
序列化概述
- 序列化:把内存中的对象,转换成字节序列,以便存储到磁盘(持久化)和网络传输
 
常用数据序列化类型
| Java类型 | Hadoop Writable类型 | 
|---|---|
| Boolean | BooleanWritable | 
| Byte | ByteWritable | 
| Int | IntWritable | 
| Float | FloatWritable | 
| Long | LongWritable | 
| Double | DoubleWritable | 
| String | Text | 
| Map | MapWritable | 
| Array | ArrayWritable | 
| Null | NullWritable | 
序列化案例实操
案例分析

编写流量统计的Bean对象
// 1 实现writable接口
public class FlowBean implements Writable{
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
        super();
    }
    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow  = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }
    // 6 编写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}
编写Mapper类
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    FlowBean v = new FlowBean();
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割字段
        String[] fields = line.split("\t");
        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];
        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);
        k.set(phoneNum);
        v.setDownFlow(downFlow);
        v.setUpFlow(upFlow);
        // 4 写出
        context.write(k, v);
    }
}
编写Reducer类
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {
        long sum_upFlow = 0;
        long sum_downFlow = 0;
        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow += flowBean.getUpFlow();
            sum_downFlow += flowBean.getDownFlow();
        }
        sum_flow = sum_downflow + sum_upflow;
        FlowBean result = new FlowBean(sum_upflow, sum_downflow,sum_flow);
        context.write(key, result);
    }
}
编写Driver类
public class FlowsumDriver {
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);
        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
Map框架原理
数据切片与MapTask并行度决定机制

FileInputFormat切片机制
- 切片大小默认等于Block大小
 - 切片时不考虑数据集整体,而实针对每一个文件单独切片
 
CombineTextInputFormat
- 如果有大量小文件,就会产生大量的MapTask,处理效率极其低下

 
MapReduce详细工作流程

Shuffle过程详解
- MapTask收集map()方法输出的kv对,放到内存缓冲区中
 - 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
 - 多个溢出文件会被合并成大的溢出文件
 - 在溢出过程及合并的过程中,都会调用Partitioner进行分区和针对key进行排序
 - ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
 - ReduceTask会抓取到同一个分区来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
 - 合并成大文件后,Shuffle的过程也就结束了
 - 注意:Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率。原则上,缓冲区越大,磁盘io的次数越少,执行速度就越快
 
Shuffle机制图解

Partition分区案例实操
案例分析

编写分区类
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 1 获取电话号码的前三位
        String preNum = key.toString().substring(0, 3);
        int partition = 4;
        // 2 判断是哪个省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}
Driver类增加自定义数据分区设置和ReduceTask设置
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
WritableComparable排序案例

Combiner合并案例

Map Join
- Map表适用于一张表十分小、一张表很大的场景
 
案例需求

MapJoinDriver
// 加载缓存数据
job.addCacheFile(new URI("hdfs://node01/cache/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
MapJoinMapper中的setup方法中读取缓存文件
 //任务开始前将pd数据缓存进pdMap
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //通过缓存文件得到小表数据pd.txt
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0]);
        //获取文件系统对象,并开流
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(path);
        //通过包装流转换为reader,方便按行读取
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
        //逐行读取,按行处理
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切割一行    
//01    小米
            String[] split = line.split("\t");
            pdMap.put(split[0], split[1]);
        }
        //关流
        IOUtils.closeStream(reader);
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //读取大表数据    
//1001    01    1
        String[] fields = value.toString().split("\t");
        //通过大表每行数据的pid,去pdMap里面取出pname
        String pname = pdMap.get(fields[1]);
        //将大表每行数据的pid替换为pname
        text.set(fields[0] + "\t" + pname + "\t" + fields[2]);
        //写出
        context.write(text,NullWritable.get());
    }
}
Yarn资源调度器
Yarn架构

Yarn工作机制
- MR程序提交到客户端所在的节点
 - YarnRunner向ResourceManager申请一个Application
 - RM将应用程序资源路径返回给YarnRunner
 - 该程序将运行所需资源提交到HDFS上
 - 资源提交完毕后,,申请运行AppMaster
 - RM将用户请求初始化成一个Task
 - 其中一个NodeManager领取到Task任务
 - NodeManager创建容器Container,并产生AppMaster
 - Container从HDFS上拷贝资源到本地
 - AppMaster向RM申请运行MapTask资源
 - RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器
 - MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序
 - AppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask
 - ReduceTask向MapTask获取相应分区的数据
 - 程序运行完毕后,MR会向RM申请注销自己
 
容量调度器

- 容量保证
- 管理员可为每个队列设置资源最低保证和资源使用上限,所有提交到该队列的应用程序共享这些资源
 
 - 灵活性
- 如果一个队列中有资源剩余,可以暂时共享给那些需要资源的队列
 - 一旦该队列有新的应用程序提交,其他队列借调的资源会归还给该队列
 
 
容量调度器多队列提交案例
- Yarn默认是一条单队列的调度器,实际使用中会出现单个任务阻塞整个队列的情况
 - 同时公司需要分业务限制集群使用率
 - 这就需要我们按照业务种类配置多条任务队列
 
修改capacity-shceduler.xml
<!-- 指定多队列,增加hive队列 -->
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default,hive</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
</property>
<!-- 降低default队列资源额定容量为40%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>40</value>
</property>
<!-- 降低default队列资源最大容量为60%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>60</value>
</property>
<!-- 指定hive队列的资源额定容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.capacity</name>
    <value>60</value>
</property>
<property>
    <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
    <value>1</value>
</property>
<!-- 指定hive队列的资源最大容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
    <value></value>
</property>
<property>
    <name>yarn.scheduler.capacity.root.hive.state</name>
    <value>RUNNING</value>
</property>
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
    <value>*</value>
</property>
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
    <value>*</value>
</property>
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
    <value>*</value>
</property>
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
    <value>-1</value>
</property>
<property>
    <name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
    <value>-1</value>
</property>
配置完成后使用命令
yarn rmadmin -refreshQueues刷新队列默认的任务都是提交到default队列的,如果希望向其他队列提交任务,需要在Driver类中声明
configuration.set("mapreduce.job.queuename","hive");
