1. Counter
1.1 内建计数器(Built-in Counters)
1.11 Task counters
每一个Built-in计数器组要么包含一个task counters (for task progresses) 或者是job counters (for progresses).
Task Counter在每一个map或reduce中收集,定期的发给task Tracker, 然后发到job tracker.
MapReduce task counters
Filesystem counters
FileInputFormat counters
FileOutputFormat counters
Job counters
1.2. 自定义计数器(User-Defined Counters)
context.getCounter(Temperature.MISSING).increment(1);
1.2.1 动态计数器(Dynamic counters)
public void incrCounter(String group, String counter, long amount)
1.2.2 Readable counter names
配置一个property文件,放在counter所在类的同级目录.
命名方法: 使用下划线分隔不同的类,比如 MyWordCount_BadRecords.properties
本地化居然也可以:MyWordCount_BadRecords_zh_CN.properties
CounterGroupName=Air Temperature Records
MISSING.name=Missing
MALFORMED.name=Malformed
1.2.3 Retrieving counters
job激活数,mapred.jobtracker.completeuserjobs.maximum,默认100,超过的会被清除掉,所以job有可能空
Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobID.forName(jobID));
Counters counters = job.getCounters();
long missing = counters.findCounter(MaxTemperatureWithCounters.Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
1.3 User-Defined Streaming Counters
ignore for Streaming topic.
2. Sorting
2.1 Partial Sort
我们知道shuffle会对key进行排序,然后在产生的每个partitionzhong,key都是排序的,只不过这个排序不是全局的,所以叫Partial Sort. 如果数据集比较小,可以使用merge和sort命令,如果大的结果集,请看下面Total Sort.
当然了,如果结果写入SequenceFile或MapFile,MR提供了Sort方法进行排序和索引。
对于一个job,对sort的控制有如下步骤:
1.job.setSortComparatorClass(),这个可以提供一个类来特化Key的排序规则。
2.如果上面1没设置,key必须是WritableComparable的子类,因为必须实现compareTo方法。
3.如果2种的类,没有注册RawComparator,那么RawComparator将使用Ser/De来构造对象,然后调用WritableComparable的compareTo方法。这当然是效率低的,所以实现自己的RawComparator
对效率提升很有必要,尤其是大对象。
2.2 Total Sort
对于一个Total Sort的需求,一个Naive的方法是只指定一个Reduce,这样自然是全局排序的.但是如果结果集比较大,则applicationMaster将负担过重,失去了分布式的优势。
既然Partitioner会造成Partial sort,那么如果,我们能够让Partitioner告诉MapReduce,什么样的值必须去某个Reduce,那不就是Total Sort了吗?
的确,它是可行的。我给这种方法取了名字,叫Ranged Partitioning。
实践中,带来一个问题,就是这个Range的boundary不好取,你没法保证range的均匀性。不均匀带来一个比较不好的性能。
当然全部浏览数据也是不现实的,可行的办法是做Sampling,来预测一个较好的数值分布。好消息是,Hadoop已经提供InputSampler.Sampler接口,和一些有用的实现. 这里我隆重介绍InputSampler和TotalSortPartitioner.
2.2.1 InputSampler
InputSampler类的结构如下,通过集成Sampler Interface的三种实现(SplitSampler,RandomSampler和IntervalSampler),当然你也可以自己写自己的Sampler.
/**
* Utility for collecting samples and writing a partition file for
* {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
*/
public class InputSampler<K,V> implements Tool {
/**
*采样器接口
*/
public interface Sampler<K,V> {
/**
* 从输入数据几种获得一个数据采样的子集,然后通过这些采样数据在Map端由
* TotalOrderPartitioner对处理数据做hash分组,以保证不同Reduce处理数据的有序性。
* 该方法的具体采样逻辑由继承类实现。
* For a given job, collect and return a subset of the keys from the
* input data.
*/
K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
}
/**
* 分片数据采样器,即从N个分片中采样,效率最高
* Samples the first n records from s splits.
* Inexpensive way to sample random data.
*/
public static class SplitSampler<K,V> implements Sampler<K,V> {
...
}
/**
* 通用的随机数据采样器,按一定的频率对所有数据做随机采样,效率很低,
* 但是对于分布不可预测的数据可能效果比较好.
* Sample from random points in the input.
* General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
* each split.
*/
public static class RandomSampler<K,V> implements Sampler<K,V> {
...
}
/**
* 有固定采样间隔的数据采样器,适合有序的数据集,效率较随机数据采样器要好一些
* Sample from s splits at regular intervals.Useful for sorted data.
*/
public static class IntervalSampler<K,V> implements Sampler<K,V> {
}
}
我们先不管Sampler的具体抽样细节,从外围看都是产生一个系列的K[] = getSample(),这个key数组提供给TotalSortPartitioner来做Boundary。Sampler一般都是获取所需要的splits进行split级别的sampling,然后在record级别进行sampling. 当然不同的sampling策略,会影响sampling的效果和数据分布.这也是上面三个不同实现的重点部分.
题记: 这里发点负能量的评论. 看hadoop看到这个sampler,我真心觉得有点蛋疼:
1. 这个sampler是基于mapper输入的,而不是基于输出,这也是为什么很多人抱怨的原因,这尼玛怎么这么stupid呢,难道不是输出才需要作partitioning吗?
2. 这个sampler是基于本地的,也就是有网络io+磁盘io的,居然没有map-reduce版本的sampling吗,开始写的时候,没有灵活设计吗?
3. 我用cdh5.1测试,发现_partition.lst不能产生,产生也不能产生预想的结果。在这里浪费了几天时间,无奈暂时撤兵,后来再看。
对这个部分的实现不敢恭维,以后没人改,我写一个看看。。
2.2.2 TotalSortPartitioner
TotalOrderPartitioner依赖于一个partition file来distribute keys,partition file是一个实现计算好的sequence file,如果我们设置的reducer number是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。
2.3 Secondary Sort
所谓Secondary Sort,就是对于k-v有多个排序纬度,比如wordcount中,第一排序字段是count倒序,第二排序字段是word字母顺序,这样的需求无法以来单一key的shuffle自然排序来完成,所以必须建立自定义的WritableComparable,并依次提供Partitioner,GroupingSortComparator,Sort:
- Partitioner,可以使需要grouping的kv输出到同一个Reducer
- Grouping, setGroupingComparatorClass
- Sort, setSortComparatorClass
3. Joins
3.1 Map Side Join
所谓MapSide Join简单而言就是Join数据连接操作发生在Map阶段,一般没有Reducer,也就是说没有Shuffle过程,其自然是非常高效的.这里有一个概念Side Data,是说这种类型的Join,适合一个大数据集和一个小数据集的连接,其中小数据集小道比较容易存放在一个机器的内存中.那么这个小数据集称为SideData.
实践中,有这么几种方法来存储SideData:
- Configuration参数存储,如果SideData是非常小的,可以使用序列化+压缩的办法,把数据存储在Job配置中。缺点是:这种办法会显著增加jobtraker和tasktracker进程的内存,浪费读取时间。
- Distribute Cache存储,简单说就是把数据存储在某固定存储上,一般用HDFS文件存储,HBase,MongoDB等。特点是在Mapper的setup节点,读取这些文件。Hadoop特别提供了-file,可以以逗号添加多个文件,可以来自于本地,hdfs,S3等。
问题: 如果是大文件,无法在单机内存加载怎么办呢?书上推荐MapFile格式存储,可以基于Map特征进行 FileAPI 级别的检索数据,只加载命中数据。HBase等也是一个选择,如果有的话。
3.2 Reduce Side Join
所谓ReduceSide Join是Join发生在Reduce阶段,其模型是比较简单的,过程如下:
a. Map输入,这里一般多个Input,无论是否使用MutlipleInput或CompositeInputFormat,总之,Join的On字段,将作为key输出。
如:k1->v1, k1->v2
b. Reduce合并,把同一个key的Values,Join在一起:
如:k1,v1,v2
c. 笛卡尔乘积, 上面的如果是1-1,那么OK。如果是1-n或者m-n怎么处理呢?答案是进行Value和文件名的关联,这样在Reduce的时候,可以进行m*n的笛卡尔乘积。为了避免长文件名编入PairValue,可以对文件名做Hash值,或者做自然数全局索引。
原理如下:file1 join file2, map产出: k1->Pair<f1,v1>, k1->Pair<f2,v2> and k1->Pair<f2,v3>, 那么Reduce时,比较简单做出v1-v2,v1-v3的join处理。
3.3. Side Data Distribution
见MapSide Join
3.4. MapReduce Library Classes
- ChainMapper+ChainReducer,M(Ms)+R(Ms)
- FieldSelectionMapper and FieldSelectionReducer (new API)
- IntSumReducer,LongSumReducer
- MultithreadedMapper (new API)
- TokenCounterMapper
- RegexMapper