Hadoop使用学习笔记(3)

Wesley13
• 阅读 660

Hadoop使用学习笔记

2. 基本Map-Reduce工作配置与原理(下)

我们先用老版本的API编写,下一篇会用新的API,并解释区别:
环境配置:
提交Job,开发IDE所在机器环境:Windows 7,4C8G,IntelliJ IDEA 15.
Hadoop集群环境:第一篇中已经提到,Linux环境的集群。

由于我们是跨环境提交任务,所以源代码和配置上多了很多麻烦事。
首先,确认windows系统能识别hadoop集群的域名,如果不能,先修改C:/Windows/System32/drivers/etc/hosts文件,添加域名解析。
我们之后把之前Linux上的hadoop拉下来到我们的windows系统中。其实只拉配置目录就行,我们只需要其中的配置文件。

我们在IDEA中新建maven工程,比如叫HadoopT。修改pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hash.test</groupId>
    <artifactId>hadoopT</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <hadoop_version>2.7.2</hadoop_version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop_version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop_version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop_version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

我们这个工程需要common模块(hdfs还有client模块依赖),hdfs模块(访问HDFS)还有client模块(提交任务)。
之后编写词语统计WordCount类:

package com.hash.test.hadoop.mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/** * @author Hash Zhang * @version 1.0.0 * @date 2016/7/20 */
public class WordCount {
    private static void deleteDir(Configuration conf, String dirPath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path targetPath = new Path(dirPath);
        //如果文件夹存在,则删除
        if (fs.exists(targetPath)) {
            boolean delResult = fs.delete(targetPath, true);
            if (delResult) {
                System.out.println(targetPath + " has been deleted sucessfullly.");
            } else {
                System.out.println(targetPath + " deletion failed.");
            }
        }
    }

    public static void main(String[] args) throws IOException {
        //设置工作类,就是main方法所在类
        JobConf jobConf = new JobConf(WordCount.class);
        //配置需要运行的JAR在本地的位置,就是本类所在的JAR包
        jobConf.set("mapreduce.job.jar", "D:\\Users\\862911\\hadoopT\\target\\hadoopT-1.0-SNAPSHOT.jar");
        //远程Hadoop集群的用户,防止没有权限
        System.setProperty("HADOOP_USER_NAME", "sfdba");
        //设置Job名称
        jobConf.setJobName("My Word Count");
        //设置每阶段输出格式
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(IntWritable.class);
        //设置Map的类和reduce的类
        jobConf.setMapperClass(Map.class);
        jobConf.setReducerClass(Reduce.class);
        //设置输入输出格式
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        //设置输入输出路径,远程Hdfs需要加链接地址
        FileInputFormat.setInputPaths(jobConf, args[0]);
        FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
        //先删除输出目录
        deleteDir(jobConf, args[1]);
        //执行Job
        JobClient.runJob(jobConf);
    }
}

class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable one = new IntWritable(1);
    private final Text key = new Text();

    public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
        String line = text.toString();
        StringTokenizer stringTokenizer = new StringTokenizer(line);
        while (stringTokenizer.hasMoreTokens()) {
            key.set(stringTokenizer.nextToken());
            outputCollector.collect(key, one);
        }
    }
}

class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
        int sum = 0;
        while (iterator.hasNext()) {
            sum += iterator.next().get();
        }
        outputCollector.collect(text, new IntWritable(sum));
    }
}

源代码结合我们之前的图很好理解
Hadoop使用学习笔记(3)
这里我们重点强调如下:
因为/test/output/每次会被生成,所以,每次执行这个程序,程序开始会先清空输出目录。保证不出错。
由于是本地将任务提交到远程,我们需要指定这个工程需要生成的jar包,通过设置mapred.jar属性来实现。
这里,我配置了WordCount 执行的配置:
Hadoop使用学习笔记(3)
这里我们先修改Program Aruguments,程序中我们取第一个参数为输入文件夹,第二个为输出。这里我们配置的都在HDFS上。
之后在下面的运行配置中添加“Run Maven Goal “HadoopT: Package””。这样,保证我们在代码中配置的jar永远是最新(这个jar地址就是maven package后生成的jar包地址)的。

jobConf.set("mapred.jar", "D:\\Users\\862911\\hadoopT\\target\\hadoopT-1.0-SNAPSHOT.jar");

之后,在nosql1上利用hdfs命令,添加file1和file2至hdfs上的/test/input目录。注意,用户一定要是hadoop。
Hadoop使用学习笔记(3)

要指定Hadoop用户,否则没有权限执行Map-red job。这个用户使用System.setproperties来配置,因为每个Hadoop应用可能不一样。
之前我们在linux下用的hadoop用户,所以在这里我们设置:

System.setProperty("HADOOP_USER_NAME", "hadoop");

之后,我们在IDEA中运行,输出如下(注意,我们的日志级别是WARN):

16/08/03 11:10:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable hdfs://nosql1:9000/test/output has been deleted sucessfullly. 16/08/03 11:10:45 INFO client.RMProxy: Connecting to ResourceManager at nosql1/10.202.7.184:8032 16/08/03 11:10:45 INFO client.RMProxy: Connecting to ResourceManager at nosql1/10.202.7.184:8032 16/08/03 11:10:45 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 16/08/03 11:10:46 INFO mapred.FileInputFormat: Total input paths to process : 2 16/08/03 11:10:46 INFO mapreduce.JobSubmitter: number of splits:3 16/08/03 11:10:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1470040052262_0010 16/08/03 11:10:47 INFO impl.YarnClientImpl: Submitted application application_1470040052262_0010 16/08/03 11:10:47 INFO mapreduce.Job: The url to track the job: http://nosql1:8088/proxy/application_1470040052262_0010/ 16/08/03 11:10:47 INFO mapreduce.Job: Running job: job_1470040052262_0010 16/08/03 11:10:54 INFO mapreduce.Job: Job job_1470040052262_0010 running in uber mode : false 16/08/03 11:10:54 INFO mapreduce.Job: map 0% reduce 0% 16/08/03 11:10:59 INFO mapreduce.Job: map 100% reduce 0% 16/08/03 11:11:05 INFO mapreduce.Job: map 100% reduce 100% 16/08/03 11:11:05 INFO mapreduce.Job: Job job_1470040052262_0010 completed successfully 16/08/03 11:11:05 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=93 FILE: Number of bytes written=476815 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=311 HDFS: Number of bytes written=39 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=3 Launched reduce tasks=1 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=9380 Total time spent by all reduces in occupied slots (ms)=3400 Total time spent by all map tasks (ms)=9380 Total time spent by all reduce tasks (ms)=3400 Total vcore-milliseconds taken by all map tasks=9380 Total vcore-milliseconds taken by all reduce tasks=3400 Total megabyte-milliseconds taken by all map tasks=9605120 Total megabyte-milliseconds taken by all reduce tasks=3481600 Map-Reduce Framework Map input records=5 Map output records=8 Map output bytes=71 Map output materialized bytes=105 Input split bytes=261 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=105 Reduce input records=8 Reduce output records=6 Spilled Records=16 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=114 CPU time spent (ms)=2180 Physical memory (bytes) snapshot=1001734144 Virtual memory (bytes) snapshot=3987656704 Total committed heap usage (bytes)=805306368 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=50 File Output Format Counters Bytes Written=39

之后,我们通过YARN可以看到这个Job的输出和记录。
我们编写一个本地程序,输出输出目录的文件内容:

package com.hash.test.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;

/** * @author Hash Zhang * @version 1.0.0 * @date 2016/7/20 */
public class TestHDFS {
    public static void main(String[] args) throws IOException {
        String uri = "hdfs://nosql1:9000/";
        Configuration config = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), config);



        // 显示在hdfs的/user/fkong下指定文件的内容
        InputStream is = fs.open(new Path("/test/output/part-00000"));
        IOUtils.copyBytes(is, System.out, 1024, true);
    }
}

执行后输出为:

aa  1
apple   3
asd 1
bbb 1
ccccc   1
egg 1

用新的API如下:

package com.hash.test.hadoop.mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.StringTokenizer;

/** * @author Hash Zhang * @version 1.0.0 * @date 2016/8/3 */
public class WordCount extends Configured implements Tool {
    private static void deleteDir(Configuration conf, String dirPath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path targetPath = new Path(dirPath);
        //如果文件夹存在,则删除
        if (fs.exists(targetPath)) {
            boolean delResult = fs.delete(targetPath, true);
            if (delResult) {
                System.out.println(targetPath + " has been deleted sucessfullly.");
            } else {
                System.out.println(targetPath + " deletion failed.");
            }
        }
    }

    public int run(String[] strings) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "sfdba");

        Job job = Job.getInstance();
        job.setJar("D:\\Users\\862911\\hadoopT\\target\\hadoopT-1.0-SNAPSHOT.jar");
        //设置Job名称
        job.setJobName("My Word Count");
        //设置每阶段输出格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置Map的类和reduce的类
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //设置输入输出格式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置输入输出路径,远程Hdfs需要加链接地址
        FileInputFormat.setInputPaths(job, strings[0]);
        FileOutputFormat.setOutputPath(job, new Path(strings[1]));
        //先删除输出目录
        deleteDir(job.getConfiguration(), strings[1]);

        final boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new WordCount(),args);
        System.exit(ret);
    }
}

class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable one = new IntWritable(1);
    private final Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer stringTokenizer = new StringTokenizer(line);
        while (stringTokenizer.hasMoreTokens()) {
            word.set(stringTokenizer.nextToken());
            context.write(word, one);
        }
    }
}

class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

之后的演示,我们都会使用新版的API。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
List的Select 和Select().tolist()
List<PersondelpnewList<Person{newPerson{Id1,Name"小明1",Age11,Sign0},newPerson{Id2,Name"小明2",Age12,
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这