Hadoop使用学习笔记
2. 基本Map-Reduce工作配置与原理(下)
我们先用老版本的API编写,下一篇会用新的API,并解释区别:
环境配置:
提交Job,开发IDE所在机器环境:Windows 7,4C8G,IntelliJ IDEA 15.
Hadoop集群环境:第一篇中已经提到,Linux环境的集群。
由于我们是跨环境提交任务,所以源代码和配置上多了很多麻烦事。
首先,确认windows系统能识别hadoop集群的域名,如果不能,先修改C:/Windows/System32/drivers/etc/hosts文件,添加域名解析。
我们之后把之前Linux上的hadoop拉下来到我们的windows系统中。其实只拉配置目录就行,我们只需要其中的配置文件。
我们在IDEA中新建maven工程,比如叫HadoopT。修改pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hash.test</groupId>
<artifactId>hadoopT</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<hadoop_version>2.7.2</hadoop_version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop_version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop_version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop_version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
我们这个工程需要common模块(hdfs还有client模块依赖),hdfs模块(访问HDFS)还有client模块(提交任务)。
之后编写词语统计WordCount类:
package com.hash.test.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
/** * @author Hash Zhang * @version 1.0.0 * @date 2016/7/20 */
public class WordCount {
private static void deleteDir(Configuration conf, String dirPath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path targetPath = new Path(dirPath);
//如果文件夹存在,则删除
if (fs.exists(targetPath)) {
boolean delResult = fs.delete(targetPath, true);
if (delResult) {
System.out.println(targetPath + " has been deleted sucessfullly.");
} else {
System.out.println(targetPath + " deletion failed.");
}
}
}
public static void main(String[] args) throws IOException {
//设置工作类,就是main方法所在类
JobConf jobConf = new JobConf(WordCount.class);
//配置需要运行的JAR在本地的位置,就是本类所在的JAR包
jobConf.set("mapreduce.job.jar", "D:\\Users\\862911\\hadoopT\\target\\hadoopT-1.0-SNAPSHOT.jar");
//远程Hadoop集群的用户,防止没有权限
System.setProperty("HADOOP_USER_NAME", "sfdba");
//设置Job名称
jobConf.setJobName("My Word Count");
//设置每阶段输出格式
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
//设置Map的类和reduce的类
jobConf.setMapperClass(Map.class);
jobConf.setReducerClass(Reduce.class);
//设置输入输出格式
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
//设置输入输出路径,远程Hdfs需要加链接地址
FileInputFormat.setInputPaths(jobConf, args[0]);
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
//先删除输出目录
deleteDir(jobConf, args[1]);
//执行Job
JobClient.runJob(jobConf);
}
}
class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private final Text key = new Text();
public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
String line = text.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line);
while (stringTokenizer.hasMoreTokens()) {
key.set(stringTokenizer.nextToken());
outputCollector.collect(key, one);
}
}
}
class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
int sum = 0;
while (iterator.hasNext()) {
sum += iterator.next().get();
}
outputCollector.collect(text, new IntWritable(sum));
}
}
源代码结合我们之前的图很好理解
这里我们重点强调如下:
因为/test/output/每次会被生成,所以,每次执行这个程序,程序开始会先清空输出目录。保证不出错。
由于是本地将任务提交到远程,我们需要指定这个工程需要生成的jar包,通过设置mapred.jar属性来实现。
这里,我配置了WordCount 执行的配置:
这里我们先修改Program Aruguments,程序中我们取第一个参数为输入文件夹,第二个为输出。这里我们配置的都在HDFS上。
之后在下面的运行配置中添加“Run Maven Goal “HadoopT: Package””。这样,保证我们在代码中配置的jar永远是最新(这个jar地址就是maven package后生成的jar包地址)的。
jobConf.set("mapred.jar", "D:\\Users\\862911\\hadoopT\\target\\hadoopT-1.0-SNAPSHOT.jar");
之后,在nosql1上利用hdfs命令,添加file1和file2至hdfs上的/test/input目录。注意,用户一定要是hadoop。
要指定Hadoop用户,否则没有权限执行Map-red job。这个用户使用System.setproperties来配置,因为每个Hadoop应用可能不一样。
之前我们在linux下用的hadoop用户,所以在这里我们设置:
System.setProperty("HADOOP_USER_NAME", "hadoop");
之后,我们在IDEA中运行,输出如下(注意,我们的日志级别是WARN):
16/08/03 11:10:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable hdfs://nosql1:9000/test/output has been deleted sucessfullly. 16/08/03 11:10:45 INFO client.RMProxy: Connecting to ResourceManager at nosql1/10.202.7.184:8032 16/08/03 11:10:45 INFO client.RMProxy: Connecting to ResourceManager at nosql1/10.202.7.184:8032 16/08/03 11:10:45 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 16/08/03 11:10:46 INFO mapred.FileInputFormat: Total input paths to process : 2 16/08/03 11:10:46 INFO mapreduce.JobSubmitter: number of splits:3 16/08/03 11:10:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1470040052262_0010 16/08/03 11:10:47 INFO impl.YarnClientImpl: Submitted application application_1470040052262_0010 16/08/03 11:10:47 INFO mapreduce.Job: The url to track the job: http://nosql1:8088/proxy/application_1470040052262_0010/ 16/08/03 11:10:47 INFO mapreduce.Job: Running job: job_1470040052262_0010 16/08/03 11:10:54 INFO mapreduce.Job: Job job_1470040052262_0010 running in uber mode : false 16/08/03 11:10:54 INFO mapreduce.Job: map 0% reduce 0% 16/08/03 11:10:59 INFO mapreduce.Job: map 100% reduce 0% 16/08/03 11:11:05 INFO mapreduce.Job: map 100% reduce 100% 16/08/03 11:11:05 INFO mapreduce.Job: Job job_1470040052262_0010 completed successfully 16/08/03 11:11:05 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=93 FILE: Number of bytes written=476815 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=311 HDFS: Number of bytes written=39 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=3 Launched reduce tasks=1 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=9380 Total time spent by all reduces in occupied slots (ms)=3400 Total time spent by all map tasks (ms)=9380 Total time spent by all reduce tasks (ms)=3400 Total vcore-milliseconds taken by all map tasks=9380 Total vcore-milliseconds taken by all reduce tasks=3400 Total megabyte-milliseconds taken by all map tasks=9605120 Total megabyte-milliseconds taken by all reduce tasks=3481600 Map-Reduce Framework Map input records=5 Map output records=8 Map output bytes=71 Map output materialized bytes=105 Input split bytes=261 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=105 Reduce input records=8 Reduce output records=6 Spilled Records=16 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=114 CPU time spent (ms)=2180 Physical memory (bytes) snapshot=1001734144 Virtual memory (bytes) snapshot=3987656704 Total committed heap usage (bytes)=805306368 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=50 File Output Format Counters Bytes Written=39
之后,我们通过YARN可以看到这个Job的输出和记录。
我们编写一个本地程序,输出输出目录的文件内容:
package com.hash.test.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
/** * @author Hash Zhang * @version 1.0.0 * @date 2016/7/20 */
public class TestHDFS {
public static void main(String[] args) throws IOException {
String uri = "hdfs://nosql1:9000/";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), config);
// 显示在hdfs的/user/fkong下指定文件的内容
InputStream is = fs.open(new Path("/test/output/part-00000"));
IOUtils.copyBytes(is, System.out, 1024, true);
}
}
执行后输出为:
aa 1
apple 3
asd 1
bbb 1
ccccc 1
egg 1
用新的API如下:
package com.hash.test.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.StringTokenizer;
/** * @author Hash Zhang * @version 1.0.0 * @date 2016/8/3 */
public class WordCount extends Configured implements Tool {
private static void deleteDir(Configuration conf, String dirPath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path targetPath = new Path(dirPath);
//如果文件夹存在,则删除
if (fs.exists(targetPath)) {
boolean delResult = fs.delete(targetPath, true);
if (delResult) {
System.out.println(targetPath + " has been deleted sucessfullly.");
} else {
System.out.println(targetPath + " deletion failed.");
}
}
}
public int run(String[] strings) throws Exception {
System.setProperty("HADOOP_USER_NAME", "sfdba");
Job job = Job.getInstance();
job.setJar("D:\\Users\\862911\\hadoopT\\target\\hadoopT-1.0-SNAPSHOT.jar");
//设置Job名称
job.setJobName("My Word Count");
//设置每阶段输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置Map的类和reduce的类
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//设置输入输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//设置输入输出路径,远程Hdfs需要加链接地址
FileInputFormat.setInputPaths(job, strings[0]);
FileOutputFormat.setOutputPath(job, new Path(strings[1]));
//先删除输出目录
deleteDir(job.getConfiguration(), strings[1]);
final boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new WordCount(),args);
System.exit(ret);
}
}
class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private final Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line);
while (stringTokenizer.hasMoreTokens()) {
word.set(stringTokenizer.nextToken());
context.write(word, one);
}
}
}
class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
之后的演示,我们都会使用新版的API。