TemperatureMR.java
package cn.kissoft.hadoop.week05;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TemperatureMR {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Temperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(TemperatureMR.class);
job.setJobName("Max and min temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
TemperatureMapper.java
package cn.kissoft.hadoop.week05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(0, 4);
int airTemperature = Integer.parseInt(line.substring(13, 19).trim());
if (Math.abs(airTemperature) != MISSING) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
MaxTemperatureReducer.java
package cn.kissoft.hadoop.week05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
int minValue = Integer.MAX_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
minValue = Math.min(minValue, value.get());
}
context.write(key, new IntWritable(maxValue));
context.write(key, new IntWritable(minValue));
}
}
运行过程
[wukong@bd11 guide]$ hadoop jar pc.jar cn.kissoft.hadoop.week05.TemperatureMR ./ch02/1959.txt ./ch02/out/
Warning: $HADOOP_HOME is deprecated.
14/08/15 16:29:32 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/08/15 16:29:32 INFO input.FileInputFormat: Total input paths to process : 1
14/08/15 16:29:32 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/08/15 16:29:32 WARN snappy.LoadSnappy: Snappy native library not loaded
14/08/15 16:29:34 INFO mapred.JobClient: Running job: job_201408151617_0003
14/08/15 16:29:35 INFO mapred.JobClient: map 0% reduce 0%
14/08/15 16:29:47 INFO mapred.JobClient: map 100% reduce 0%
14/08/15 16:30:00 INFO mapred.JobClient: map 100% reduce 100%
14/08/15 16:30:04 INFO mapred.JobClient: Job complete: job_201408151617_0003
14/08/15 16:30:04 INFO mapred.JobClient: Counters: 29
14/08/15 16:30:04 INFO mapred.JobClient: Job Counters
14/08/15 16:30:04 INFO mapred.JobClient: Launched reduce tasks=1
14/08/15 16:30:04 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=14989
14/08/15 16:30:04 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
14/08/15 16:30:04 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
14/08/15 16:30:04 INFO mapred.JobClient: Launched map tasks=1
14/08/15 16:30:04 INFO mapred.JobClient: Data-local map tasks=1
14/08/15 16:30:04 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=12825
14/08/15 16:30:04 INFO mapred.JobClient: File Output Format Counters
14/08/15 16:30:04 INFO mapred.JobClient: Bytes Written=19
14/08/15 16:30:04 INFO mapred.JobClient: FileSystemCounters
14/08/15 16:30:04 INFO mapred.JobClient: FILE_BYTES_READ=9180486
14/08/15 16:30:04 INFO mapred.JobClient: HDFS_BYTES_READ=27544475
14/08/15 16:30:04 INFO mapred.JobClient: FILE_BYTES_WRITTEN=13886908
14/08/15 16:30:04 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=19
14/08/15 16:30:04 INFO mapred.JobClient: File Input Format Counters
14/08/15 16:30:04 INFO mapred.JobClient: Bytes Read=27544368
14/08/15 16:30:04 INFO mapred.JobClient: Map-Reduce Framework
14/08/15 16:30:04 INFO mapred.JobClient: Map output materialized bytes=4590240
14/08/15 16:30:04 INFO mapred.JobClient: Map input records=444264
14/08/15 16:30:04 INFO mapred.JobClient: Reduce shuffle bytes=4590240
14/08/15 16:30:04 INFO mapred.JobClient: Spilled Records=1251882
14/08/15 16:30:04 INFO mapred.JobClient: Map output bytes=3755646
14/08/15 16:30:04 INFO mapred.JobClient: Total committed heap usage (bytes)=218865664
14/08/15 16:30:04 INFO mapred.JobClient: CPU time spent (ms)=6280
14/08/15 16:30:04 INFO mapred.JobClient: Combine input records=0
14/08/15 16:30:04 INFO mapred.JobClient: SPLIT_RAW_BYTES=107
14/08/15 16:30:04 INFO mapred.JobClient: Reduce input records=417294
14/08/15 16:30:04 INFO mapred.JobClient: Reduce input groups=1
14/08/15 16:30:04 INFO mapred.JobClient: Combine output records=0
14/08/15 16:30:04 INFO mapred.JobClient: Physical memory (bytes) snapshot=322985984
14/08/15 16:30:04 INFO mapred.JobClient: Reduce output records=2
14/08/15 16:30:04 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1455579136
14/08/15 16:30:04 INFO mapred.JobClient: Map output records=417294
运行结果
[wukong@bd11 guide]$ hadoop fs -cat ./ch02/out/part-r-00000
Warning: $HADOOP_HOME is deprecated.
1959 418
1959 -400
截图
**
**