一、Hadoop压缩简介
1、hadoop的3个阶段
(1)分布式文件系统HDFS
(2)分布式编程框架MapReduce
(3)yarn框架
2、Hadoop数据压缩
MR操作过程中进行大量数据传输。
压缩技术能够有效的减少底层存储(HDFS)读写字节数。
压缩提高了网络带宽和磁盘空间的效率。
数据压缩能够有效的节省资源!
压缩是mr程序的优化策略!
通过压缩编码对mapper或者reducer数据传输进行数据的压缩,以减少磁盘IO。
3、压缩的基本原则
1、运算密集型任务少用压缩
2、IO密集型的任务,多用压缩
4、MR支持的压缩编码
压缩格式 | hadoop是否自带? |文件拓展名 | 是否可以切分
DEFAULT | 是 | .deflate | 否
Gzip | 是 | .gz | 否
bzip2 | 是 | .bz2 | 是
LZO | 否 | .lzo | 是
Snappy | 否 | .snappy | 否
5、编码/解码器
DEFAULT | org.apache.hadoop.io.compress.DefaultCodeC
Gzip | org.apache.hadoop.io.compress.GzipCodeC
bzip2 | org.apache.hadoop.io.compress.BZip2CodeC
LZO | com.hadoop.compression.lzo.LzoCodeC
Snappy | org.apache.hadoop.io.compress.SnappyCodeC
6、压缩性能
压缩算法 | 原始文件大小 | 压缩文件大小| 压缩速度 | 解压速度
gzip | 8.3GB | 1.8GB |17,5MB/s |58MB/s
bzip2 | 8.3GB | 1.1GB |2.4MB/s |9.5MB/s
LZO | 8.3gb | 2.9GB |49.3MB/s |74.6MB/s
7、使用方式
(1)map端输出压缩
//开启map端的输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
//设置压缩方式
//conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);
conf.setClass("mapreduce.map.output.compress.codec",BZip2Codec.class, CompressionCodec.class);
(2)reduce端输出压缩
//开启reduce端的输出压缩
FileOutputFormat.setCompressOutput(job, true);
//设置压缩方式
//FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
//FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
二、Hadoop压缩使用方式
1.Mapper类
package com.css.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
// key 起始偏移量 value 数据 context 上下文
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1.读取数据
String line = value.toString();
// 2.切割 hello hunter
String[] words = line.split(" ");
// 3.循环的写到下一个阶段<hello,1><hunter,1>
for (String w : words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
2.Reducer类
package com.css.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 1.统计单词出现的次数
int sum = 0;
// 2.累加求和
for (IntWritable count : values) {
// 拿到值累加
sum += count.get();
}
// 3.结果输出
context.write(key, new IntWritable(sum));
}
}
3.Driver类
package com.css.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 开启map端的输出压缩
// conf.setBoolean("mapreduce.map.output.compress", true);
// 设置压缩方式
// conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);
// conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
// 2.获取jar包
job.setJarByClass(WordCountDriver.class);
// 3.获取自定义的mapper与reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置reduce输出的数据类型(最终的数据类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 开启reduce端的输出压缩
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩方式
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// 6.设置输入存在的路径与处理后的结果路径
FileInputFormat.setInputPaths(job, new Path("c:/compress1031/in"));
FileOutputFormat.setOutputPath(job, new Path("c:/compress1031/out2"));
// 7.提交任务
boolean rs = job.waitForCompletion(true);
System.out.println(rs?0:1);
}
}
4.输入文件words.txt
I love Beijing
I love China
Beijing is the capital of China
5.输出文件的名字分别如下
(1)
part-r-00000.bz2
(2)
part-r-00000.deflate
(3)
part-r-00000.gz
三、自定义压缩工具
1.自定义压缩工具类
package com.css.compress;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
public class TestCompress {
public static void main(String[] args) throws ClassNotFoundException, IOException {
compress("c:/compress1031/intest/test.txt","org.apache.hadoop.io.compress.DefaultCodec");
compress("c:/compress1031/intest/test.txt","org.apache.hadoop.io.compress.BZip2Codec");
compress("c:/compress1031/intest/test.txt","org.apache.hadoop.io.compress.GzipCodec");
}
// 测试压缩方法
private static void compress(String fileName, String method) throws ClassNotFoundException, IOException{
// 1.获取输入流
FileInputStream fis = new FileInputStream(new File(fileName));
Class<?> cName = Class.forName(method);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(cName, new Configuration());
// 2.输出流
FileOutputStream fos = new FileOutputStream(new File(fileName + codec.getDefaultExtension()));
// 3.创建压缩输出流
CompressionOutputStream cos = codec.createOutputStream(fos);
// 4.流的对拷
IOUtils.copyBytes(fis, cos, 1024*1024*2, false);
// 5.关闭资源
fis.close();
cos.close();
fos.close();
}
}
2.输入文件名
test.txt
3.输出文件名
(1)
test.txt.deflate
(2)
test.txt.bz2
(3)
test.txt.gz