之前一直对Hadoop的MapReduce过程知道皮毛,如今特地深入学习了这个过程一下,把我的理解写下来。
以下就是是我读书,看博客,然后根据自己的理解和经验总结出来的。错误的地方,还请大神指出。^_^由衷感谢~
宏观上来说,Hadoop的MapReduce在执行每个作业的时候要经历2个大阶段:Map阶段和Reduce阶段。
下图是官方对整个MapReduce过程的描述图:
我们要了解MapReduce的整个过程,还必须对MapReduce的整个架构体系有一定的了解,下图是Hadoop的MapReduce图:
在进行这一切之前,我们还必须了解Hadoop是如何分块的。分为两部分:Block和Split。
(1)Block块:数据的划分,这个是真实的物理划分。
数据文件上传到HDFS后,会被分成一块一块的。每一块的大小可以自己配置(hadoop-default.xml中),默认的选项是64MB,一个文件被分成多个64MB的小文件,最后一个可能会小于64MB。
Block是有冗余的,Hadoop为了保证数据的安全,每个Block都会被复制3份,作为备份,分布在3个不同的DataNode中,当一份宕掉,其余的可以即刻补上。同样,这个备份的数量也是可以配置的,具体配置自己百度。
(2)Split块:Split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。
Split划分是由InputFormat这个接口来定义的,其中就有一个getSplits方法。
不同的Hadoop版本是由不同的job任务来完成的。早先的版本split是由JobTracker完成的,后来的版本是由JobClient完成的,JobClient划分好后,把split.file写入HDFS中,到时候JobTracker读这个文件,就知道split是如何划分的了。
举例了解Split的划分:
File1 : Block11 , Block12 , Block13 , Block14 , Block15
File2 : Block21 , Block22 , Block23
File1有5个Block,最后一个可能会小于64MB;File2有3个Block。
若用户在程序中指定了map tasks的个数,比如说是2(若不指定,则默认为1),那么在FileInputFormat(最常见的InputFormat实现)的getSplits方法中,首先会计算totalSize=8(对照源码看看,注意getSplits这个函数里的计量单位是Block个数,而不是Byte个数),然后会计算goalSize=totalSize / numSplits = 4,对于File1,计算一个Split有多少个Block是下面这样计算的:
long splitSize = computeSplitSize ( goalSize , minSize , blockSize );
protected long computeSplitSize ( long goalSize , long minSize , long blockSize ) {
return Math . max ( minSize , Math . min ( goalSize , blockSize ));
}
这里的minSize是1(当然也是默认值),计算得出splitSize = 4,代表的是一个Split包含了多少个Block,所以接下来的Split划分是这样的:
Split1 : Block11 , Block12 , Block13 , Block14
Split2 : Block15
Split3 : Block21 , Block22 , Block23
那用户指定的map个数是2,出现了三个Split怎么办?事实上,在JobInProgress里map tasks的个数是根据Splits的长度决定的,用户指定的map个数只是个参考。可以参考JobInProgress:initTasks()的代码:
try {
splits = JobClient . readSplitFile ( splitFile );
} finally {
splitFile . close ();
}
numMapTasks = splits . length ;
maps = new TaskInProgress [ numMapTasks ];
如果用户设置了10个map作业,我们的splitSize = 1,那么输入刚才我们相同的数据,得到的split会是8个(每个Block会对应一个Split),map tasks个数同样也是8个,也就是说map tasks的个数是由splits的长度来决定的。
补充:
FileInputFormat默认为文件在HDFS上的每一个Block生成一个对应的FileSplit。那么自然,FileSplit.start就是对应Block在文件中的Offset、FileSplit.length就是对应Block的Length、FileSplit.hosts就是对应Block的Location。
但是可以设置“mapred.min.split.size”参数,使得Split的大小大于一个Block,这时候FileInputFormat会将连续的若干个Block分在一个Split中、也可能会将一个Block分别划在不同的Split中(但是前提是一个Split必须在一个文件中)。这里面的情况就说明了Split、Block 的对应关系是一对一、多对一、一对多都是可能的。
而如果使用CombineFileInputFormat(implements InputFormat),将若干个Split打包成一个,避免过多的Map任务(因为Split的数目决定了Map的数目)的话也是可以理解成一对一一对多,但是不会是多对一的情况的。
Split的一点小结论:
1)一个Split不会包含两个File的Block,不会跨越File边界。
2)map tasks的个数最终决定于splits的长度。
3)Split的划分方法完全由用户自己决定。
4)split的多少决定了Map Task的数目,因为每个split会交给一个Map Task处理。
下面附上一张Block和Split的对应关系图:
还有一点需要说明,在FileSplit类中,有一项是private String[] hosts;
看上去是说明这个FileSplit是放在哪些机器上的,实际上hosts里只是存储了一个Block的冗余机器列表。
比如上面例子中的Split 1: Block11, Block12, Block13,Block14,这个FileSplit中的hosts里最终存储的是Block11本身和其剩余备份所在的机器列表,也就是说Block12,Block13,Block14存在哪些机器上没有在FileSplit中记录。
FileSplit中的这个属性有利于调度作业时候的数据本地性问题。如果一个tasktracker前来索取task,jobtracker就会找个task给他,找到一个maptask,得先看这个task的输入的FileSplit里hosts是否包含tasktracker所在机器,也就是判断和该tasktracker同时存在一个机器上的datanode是否拥有FileSplit中某个Block的备份。
但总之,只能牵就一个Block,其他Block就从网络上传吧。
接下来,我们进入正题:Map端的数据处理。
一、Map
Map下分成了4个子阶段:
【1】从磁盘上读取数据
【2】执行map函数
【3】combine结果
【4】将结果写到本地的磁盘上
Map Task的大致执行过程如下图1所示,Map Task先将对应的split迭代解析成一个个key/value对,然后依次调用用户定义的map()函数进行处理,最终将临时结果存放到本地磁盘上,其中,临时数据被分成若干个partition,每个partition将被一个Reduce Task处理。
图1
Map端数据的处理详细过程如下图2:
图2
上图的Mapper进行更加详细的分解如下图3:
图3
上面的图2是从大神的Blog中截取下来的图,是Map过程的一个大致步骤,我们一一进行讲解:
1)map tasks的Input来自于HDFS的Block,在MapReduce的概念中,是InputSplit。然后运行map task,map task先将对应的split迭代解析成一个个的key/value对,默认的规则是把每一行的文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容,依次调用用户自定义的map()函数即Mapper进行处理。
2)在经过Mapper的运行后,Mapper的输出(更多的key/value对)会按照一定的规则进行partition(分区), MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。partition(分区)的 比较是基于key进行的。比如,我们的key表示省份(如北京、上海等),那么就可以按照不同的省份进行partition,同一个省份的key/value被划分到一个区中。 默认的是只有一个分区 。 分区的数量就是Reducer任务运行的数量 。同样,默认的只有一个Reducer任务。
具体的partition解析,请看网上大神的文章:
http://www.cnblogs.com/xwdreamer/archive/2011/10/27/2296943.html
http://blog.csdn.net/chjjunking/article/details/6746762
http://blog.sina.com.cn/s/blog_6b1ff7650101imzp.html
3) map task的输出到partition处理完成之后,将数据缓冲到内存中, 缓冲区的作用是批量收集map结果, 并且不是直接写到磁盘中,缓存的好处就是减少磁盘的I/O开销,提高合并(combine)和排序(sort)的速度。 我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。 默认的内存缓冲是100M(可以配置)。 当缓冲的内存大小使用超过一定的阈值(默认是80%(buffer size * spill percent = 100MB * 0.8 = 80MB)),一个后台的线程就会启动把缓冲区中的数据写入(Spill)到磁盘中,往内存中写入的线程会继续执行直到缓冲区满,缓冲区满后线程阻塞直至缓冲区被清空。
在Spill线程启动后,需要对这80MB空间内的key/value对做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。首先,按照key进行排序,对于key相同的key/value对,按照value进行排序。比如:三个key/value对:<2,2>、<1,3>、<2,1>,它们的键值都是整数。那么排序(Sort)后的结果是<1,3>、<2,1>、<2,2>。
Spill过程一个很重要的细节就在于,如果有很多的key/value对需要发送到Reduce端去,那么需要将这些key/value对的value拼接(combine处理,也叫规约处理)到一块去,减少partition相关的索引记录。
如果用户设置过Combiner,那么现在就是使用Combiner的时候了,如果没有的话,直接就会输出到本地的磁盘中。(默认是没有的)。
Combine(规约)处理,其实本质上就是Reduce处理。key相同的key/value对会调用一次reduce方法。经过这一阶段,数据量会减少,比如WordCount中,就会将具有相同的key的key/value对的value加起来,减少Spill到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以在MapReduce中会多次使用。因为,Combiner的输出是Reducer的输入,Combiner绝对不能改变最终的计算结果,所以Combiner只应该用于那种Reducer的输入key/value与输出的key/value类型完全一致,且不影响最终结果的场景。
4)Sort和Combine都是在Spill过程中进行的,这一点要搞清楚。
每次的Spill都会在本地磁盘上生成一个spill文件,如果map的输出结果很大,多次进行了Spill过程,磁盘上就会相应的有多个spill文件存在。当map task真正完成时,内存缓冲区中的数据也全部Spill到本地磁盘上形成了一个spill文件。最终的磁盘上会至少有一个这样的spill文件存在(当map的输出结果很少的时候,当map执行完成后,只会产生一个spill文件),因为最终的文件只有一个,所以需要将这些spill文件归并到一起,这个过程叫做Merge。
Merge是怎样的呢?例如,WordCount中,“Hello”这个单词,从某个map task生成的spill文件中读取过来时值是5,从另外一个map task生成的spill文件中读取过来时,值是8,因为它们的key相同,所以得merge成group。那么又出来一个概念,group,group就像是这样的:{"Hello", [5,8,2,...]},数组中的值就是从不同的spill文件中读取出来的,然后再把这些值加起来。这个地方请注意一下,因为merge是将多个spill文件合并到一个文件,所以可能也会有相同的key存在,在这个时候,如果之前用户设置过Combiner,就会直接使用Combiner来合并相同的key了。
参考文章:
http://langyu.iteye.com/blog/992916
http://my.oschina.net/itblog/blog/275294?fromerr=g7JcUvIV
http://www.cnblogs.com/zhangcm/archive/2012/11/23/2784495.html
至此,map端的所有工作都已经结束了,最终生成的这个文件也会存放在TaskTracker拿得到的某个本地目录内。每个reduce task不断通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某个TaskTracker上的map task执行完成,Shuffle的后半段过程(Reduce)启动。
二、Reduce
Reduce下也分成了4个子阶段:
【1】从各个map task上读相应的数据(shuffle)?
怎样把map task的输出结果有效地传送到reduce端。也可以这样理解:Shuffle描述着数据从map task输出到reduce task输入的整个过程。
【2】sort
【3】执行reduce函数
【4】将结果写到HDFS中。
Reduce Task执行过程如下图4,这个过程可以分成3个阶段:
(1)从远程节点上读取Map Task中间结果(称为“Shuffle”阶段)
(2)按照key对key/value对进行排序(称为“Sort”阶段)
(3)依次读取<key, value list>,调用用户自定义的reduce()函数处理,并将最终结果存到HDFS上(称为“Reduce”阶段)
图4
Shuffle在Reduce端的过程也能够用下图5的3点来概括。
当前的reduce copy数据的前提是它要从JobTracker那里获得有哪些的map task已经执行结束了。
Reducer真正运行之前,所有的时间都是在拉取数据,做merge,并且不断重复此过程。
首先,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件,这其实就是我们重点关注的Shuffle过程在Reduce端的实现。见下图5:
图5
下面是Reducer的一个细节图:
图6
下面我们详细描述一下reduce端的Shuffle细节:
1)Copy过程,拉取数据。
Reduce进行启动一些数据copy线程(Fetcher,默认是5个),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件,即map task输出的key/value对。因为map task早已结束,这些文件就归TaskTracker管理在本地的磁盘上了。
2)Merge阶段。
这里的merge和map端的merge动作一样,只是数组中存放的是不同的map端Copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。
需要强调的是,merge有三种形式:
(1)内存到内存
(2)内存到磁盘
(3)磁盘到磁盘
默认情况下第一种情况是不启用的。当内存中的数据量达到一定阈值,就启动内存到磁盘的merge。这个与map端处理数据类似,也是Spill过程。当然,在这个过程中,用户如果设置了Combiner,同样会进行 Combiner(合并) & Sort(排序) 的,然后在磁盘中生成了众多的spill文件。第二种merge方式一直在运行,直到没有map端的数据时才会结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个要输入Reducer的文件。
3)生成Reducer的输入文件。
不断地merge之后,最后会生成一个“最终文件”。这个文件可能存在于磁盘上,也可能存在于内存中。对于我们来说,我们希望是这个最终文件是存放在内存中的,可以直接作为Reducer的输入。但是在默认情况下,这个文件是存放在磁盘中的。
这样Reducer的输入文件已经确定了,整个Shuffle才最终结束了。然后接下来的就是Reducer执行,把结果放到HDFS上。
这样整个Reduce端的Shuffle过程就结束了,之后将Shuffle完成后生成的文件输入Reducer,对其已经排序好的key/value对调用reduce()方法。key相同的key/value对调用一次reduce()方法,每次调用会产生零个或者多个key/value对。最终,通过OutputFormat()方法将这些输出的key/value对写入到HDFS文件中。
因为上面我们在讲解Mapper任务和Reducer任务的过程中,很多阶段都出现了key/value对,很容易混淆,下面是对key/value对进行了编号,方便大家对key/value对的变化情况的理解。如下图7所示:
图7
上图中,对于Mapper任务的输入的key/value对,定义为key1和value1。在map方法中处理后,输出的key/value对,定义为key2和value2。reduce方法接收key2和value2,处理后,输出key3和value3。