MapReduce+Shuffle详解

Stella981
• 阅读 787

之前一直对Hadoop的MapReduce过程知道皮毛,如今特地深入学习了这个过程一下,把我的理解写下来。

以下就是是我读书,看博客,然后根据自己的理解和经验总结出来的。错误的地方,还请大神指出。^_^由衷感谢~

宏观上来说,Hadoop的MapReduce在执行每个作业的时候要经历2个大阶段:Map阶段和Reduce阶段。

下图是官方对整个MapReduce过程的描述图:

MapReduce+Shuffle详解

我们要了解MapReduce的整个过程,还必须对MapReduce的整个架构体系有一定的了解,下图是Hadoop的MapReduce图:

MapReduce+Shuffle详解

在进行这一切之前,我们还必须了解Hadoop是如何分块的。分为两部分:Block和Split。

(1)Block块:数据的划分,这个是真实的物理划分。

数据文件上传到HDFS后,会被分成一块一块的。每一块的大小可以自己配置(hadoop-default.xml中),默认的选项是64MB,一个文件被分成多个64MB的小文件,最后一个可能会小于64MB。

Block是有冗余的,Hadoop为了保证数据的安全,每个Block都会被复制3份,作为备份,分布在3个不同的DataNode中,当一份宕掉,其余的可以即刻补上。同样,这个备份的数量也是可以配置的,具体配置自己百度。

(2)Split块:Split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。

Split划分是由InputFormat这个接口来定义的,其中就有一个getSplits方法。

不同的Hadoop版本是由不同的job任务来完成的。早先的版本split是由JobTracker完成的,后来的版本是由JobClient完成的,JobClient划分好后,把split.file写入HDFS中,到时候JobTracker读这个文件,就知道split是如何划分的了。

举例了解Split的划分:

File1 :   Block11 ,   Block12 ,   Block13 ,   Block14 ,   Block15 
File2 :   Block21 ,   Block22 ,   Block23

File1有5个Block,最后一个可能会小于64MB;File2有3个Block。

若用户在程序中指定了map tasks的个数,比如说是2(若不指定,则默认为1),那么在FileInputFormat(最常见的InputFormat实现)的getSplits方法中,首先会计算totalSize=8(对照源码看看,注意getSplits这个函数里的计量单位是Block个数,而不是Byte个数),然后会计算goalSize=totalSize / numSplits = 4,对于File1,计算一个Split有多少个Block是下面这样计算的:

long  splitSize  =  computeSplitSize ( goalSize ,  minSize ,  blockSize ); 
protected   long  computeSplitSize ( long  goalSize ,   long  minSize ,   long  blockSize )   { 
  return   Math . max ( minSize ,   Math . min ( goalSize ,  blockSize )); 
}

这里的minSize是1(当然也是默认值),计算得出splitSize = 4,代表的是一个Split包含了多少个Block,所以接下来的Split划分是这样的:

Split1 :   Block11 ,   Block12 ,   Block13 ,   Block14 
Split2 :   Block15 
Split3 :   Block21 ,   Block22 ,   Block23

那用户指定的map个数是2,出现了三个Split怎么办?事实上,在JobInProgress里map tasks的个数是根据Splits的长度决定的,用户指定的map个数只是个参考。可以参考JobInProgress:initTasks()的代码:

try   { 
 splits  =   JobClient . readSplitFile ( splitFile ); 
  }   finally   { 
 splitFile . close (); 
  } 
 numMapTasks  =  splits . length ; 
 maps  =   new   TaskInProgress [ numMapTasks ];

如果用户设置了10个map作业,我们的splitSize = 1,那么输入刚才我们相同的数据,得到的split会是8个(每个Block会对应一个Split),map tasks个数同样也是8个,也就是说map tasks的个数是由splits的长度来决定的。

补充:

FileInputFormat默认为文件在HDFS上的每一个Block生成一个对应的FileSplit。那么自然,FileSplit.start就是对应Block在文件中的Offset、FileSplit.length就是对应Block的Length、FileSplit.hosts就是对应Block的Location。

但是可以设置“mapred.min.split.size”参数,使得Split的大小大于一个Block,这时候FileInputFormat会将连续的若干个Block分在一个Split中、也可能会将一个Block分别划在不同的Split中(但是前提是一个Split必须在一个文件中)。这里面的情况就说明了Split、Block 的对应关系是一对一、多对一、一对多都是可能的。

而如果使用CombineFileInputFormat(implements InputFormat),将若干个Split打包成一个,避免过多的Map任务(因为Split的数目决定了Map的数目)的话也是可以理解成一对一一对多,但是不会是多对一的情况的。

Split的一点小结论:

1)一个Split不会包含两个File的Block,不会跨越File边界。

2)map tasks的个数最终决定于splits的长度。

3)Split的划分方法完全由用户自己决定。

4)split的多少决定了Map Task的数目,因为每个split会交给一个Map Task处理。

下面附上一张Block和Split的对应关系图:

MapReduce+Shuffle详解

还有一点需要说明,在FileSplit类中,有一项是private String[] hosts;

看上去是说明这个FileSplit是放在哪些机器上的,实际上hosts里只是存储了一个Block的冗余机器列表。

比如上面例子中的Split 1: Block11, Block12, Block13,Block14,这个FileSplit中的hosts里最终存储的是Block11本身和其剩余备份所在的机器列表,也就是说Block12,Block13,Block14存在哪些机器上没有在FileSplit中记录。

FileSplit中的这个属性有利于调度作业时候的数据本地性问题。如果一个tasktracker前来索取task,jobtracker就会找个task给他,找到一个maptask,得先看这个task的输入的FileSplit里hosts是否包含tasktracker所在机器,也就是判断和该tasktracker同时存在一个机器上的datanode是否拥有FileSplit中某个Block的备份。

但总之,只能牵就一个Block,其他Block就从网络上传吧。

接下来,我们进入正题:Map端的数据处理。

一、Map

Map下分成了4个子阶段:

【1】从磁盘上读取数据

【2】执行map函数

【3】combine结果

【4】将结果写到本地的磁盘上

Map Task的大致执行过程如下图1所示,Map Task先将对应的split迭代解析成一个个key/value对,然后依次调用用户定义的map()函数进行处理,最终将临时结果存放到本地磁盘上,其中,临时数据被分成若干个partition,每个partition将被一个Reduce Task处理。

MapReduce+Shuffle详解

图1

Map端数据的处理详细过程如下图2:

MapReduce+Shuffle详解

图2

上图的Mapper进行更加详细的分解如下图3:

MapReduce+Shuffle详解

图3

上面的图2是从大神的Blog中截取下来的图,是Map过程的一个大致步骤,我们一一进行讲解:

1)map tasks的Input来自于HDFS的Block,在MapReduce的概念中,是InputSplit。然后运行map task,map task先将对应的split迭代解析成一个个的key/value对,默认的规则是把每一行的文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容,依次调用用户自定义的map()函数即Mapper进行处理。

2)在经过Mapper的运行后,Mapper的输出(更多的key/value对)会按照一定的规则进行partition(分区), MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。partition(分区)的 比较是基于key进行的。比如,我们的key表示省份(如北京、上海等),那么就可以按照不同的省份进行partition,同一个省份的key/value被划分到一个区中。 默认的是只有一个分区 。 分区的数量就是Reducer任务运行的数量 。同样,默认的只有一个Reducer任务。

具体的partition解析,请看网上大神的文章:

http://www.cnblogs.com/xwdreamer/archive/2011/10/27/2296943.html

http://blog.csdn.net/chjjunking/article/details/6746762

http://blog.sina.com.cn/s/blog_6b1ff7650101imzp.html

3) map task的输出到partition处理完成之后,将数据缓冲到内存中, 缓冲区的作用是批量收集map结果, 并且不是直接写到磁盘中,缓存的好处就是减少磁盘的I/O开销,提高合并(combine)和排序(sort)的速度。 我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。 默认的内存缓冲是100M(可以配置)。 当缓冲的内存大小使用超过一定的阈值(默认是80%(buffer size * spill percent = 100MB * 0.8 = 80MB)),一个后台的线程就会启动把缓冲区中的数据写入(Spill)到磁盘中,往内存中写入的线程会继续执行直到缓冲区满,缓冲区满后线程阻塞直至缓冲区被清空。

在Spill线程启动后,需要对这80MB空间内的key/value对做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。首先,按照key进行排序,对于key相同的key/value对,按照value进行排序。比如:三个key/value对:<2,2>、<1,3>、<2,1>,它们的键值都是整数。那么排序(Sort)后的结果是<1,3>、<2,1>、<2,2>。

Spill过程一个很重要的细节就在于,如果有很多的key/value对需要发送到Reduce端去,那么需要将这些key/value对的value拼接(combine处理,也叫规约处理)到一块去,减少partition相关的索引记录。

如果用户设置过Combiner,那么现在就是使用Combiner的时候了,如果没有的话,直接就会输出到本地的磁盘中。(默认是没有的)。

Combine(规约)处理,其实本质上就是Reduce处理。key相同的key/value对会调用一次reduce方法。经过这一阶段,数据量会减少,比如WordCount中,就会将具有相同的key的key/value对的value加起来,减少Spill到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以在MapReduce中会多次使用。因为,Combiner的输出是Reducer的输入,Combiner绝对不能改变最终的计算结果,所以Combiner只应该用于那种Reducer的输入key/value与输出的key/value类型完全一致,且不影响最终结果的场景。

4)Sort和Combine都是在Spill过程中进行的,这一点要搞清楚。

每次的Spill都会在本地磁盘上生成一个spill文件,如果map的输出结果很大,多次进行了Spill过程,磁盘上就会相应的有多个spill文件存在。当map task真正完成时,内存缓冲区中的数据也全部Spill到本地磁盘上形成了一个spill文件。最终的磁盘上会至少有一个这样的spill文件存在(当map的输出结果很少的时候,当map执行完成后,只会产生一个spill文件),因为最终的文件只有一个,所以需要将这些spill文件归并到一起,这个过程叫做Merge。

Merge是怎样的呢?例如,WordCount中,“Hello”这个单词,从某个map task生成的spill文件中读取过来时值是5,从另外一个map task生成的spill文件中读取过来时,值是8,因为它们的key相同,所以得merge成group。那么又出来一个概念,group,group就像是这样的:{"Hello", [5,8,2,...]},数组中的值就是从不同的spill文件中读取出来的,然后再把这些值加起来。这个地方请注意一下,因为merge是将多个spill文件合并到一个文件,所以可能也会有相同的key存在,在这个时候,如果之前用户设置过Combiner,就会直接使用Combiner来合并相同的key了。

参考文章:

http://langyu.iteye.com/blog/992916

http://my.oschina.net/itblog/blog/275294?fromerr=g7JcUvIV

http://www.cnblogs.com/zhangcm/archive/2012/11/23/2784495.html

至此,map端的所有工作都已经结束了,最终生成的这个文件也会存放在TaskTracker拿得到的某个本地目录内。每个reduce task不断通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某个TaskTracker上的map task执行完成,Shuffle的后半段过程(Reduce)启动。

二、Reduce

Reduce下也分成了4个子阶段:

【1】从各个map task上读相应的数据(shuffle)?

怎样把map task的输出结果有效地传送到reduce端。也可以这样理解:Shuffle描述着数据从map task输出到reduce task输入的整个过程。

【2】sort

【3】执行reduce函数

【4】将结果写到HDFS中。

Reduce Task执行过程如下图4,这个过程可以分成3个阶段:

(1)从远程节点上读取Map Task中间结果(称为“Shuffle”阶段)

(2)按照key对key/value对进行排序(称为“Sort”阶段)

(3)依次读取<key, value list>,调用用户自定义的reduce()函数处理,并将最终结果存到HDFS上(称为“Reduce”阶段)

MapReduce+Shuffle详解

                                              图4

Shuffle在Reduce端的过程也能够用下图5的3点来概括。

当前的reduce copy数据的前提是它要从JobTracker那里获得有哪些的map task已经执行结束了。

Reducer真正运行之前,所有的时间都是在拉取数据,做merge,并且不断重复此过程。

首先,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件,这其实就是我们重点关注的Shuffle过程在Reduce端的实现。见下图5:

MapReduce+Shuffle详解

图5

下面是Reducer的一个细节图:

MapReduce+Shuffle详解

                                                         图6

下面我们详细描述一下reduce端的Shuffle细节:

1)Copy过程,拉取数据。

Reduce进行启动一些数据copy线程(Fetcher,默认是5个),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件,即map task输出的key/value对。因为map task早已结束,这些文件就归TaskTracker管理在本地的磁盘上了。

2)Merge阶段。

这里的merge和map端的merge动作一样,只是数组中存放的是不同的map端Copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。

需要强调的是,merge有三种形式:

(1)内存到内存

(2)内存到磁盘

(3)磁盘到磁盘

默认情况下第一种情况是不启用的。当内存中的数据量达到一定阈值,就启动内存到磁盘的merge。这个与map端处理数据类似,也是Spill过程。当然,在这个过程中,用户如果设置了Combiner,同样会进行 Combiner(合并)  &  Sort(排序) 的,然后在磁盘中生成了众多的spill文件。第二种merge方式一直在运行,直到没有map端的数据时才会结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个要输入Reducer的文件。

3)生成Reducer的输入文件。

不断地merge之后,最后会生成一个“最终文件”。这个文件可能存在于磁盘上,也可能存在于内存中。对于我们来说,我们希望是这个最终文件是存放在内存中的,可以直接作为Reducer的输入。但是在默认情况下,这个文件是存放在磁盘中的。

这样Reducer的输入文件已经确定了,整个Shuffle才最终结束了。然后接下来的就是Reducer执行,把结果放到HDFS上。

这样整个Reduce端的Shuffle过程就结束了,之后将Shuffle完成后生成的文件输入Reducer,对其已经排序好的key/value对调用reduce()方法。key相同的key/value对调用一次reduce()方法,每次调用会产生零个或者多个key/value对。最终,通过OutputFormat()方法将这些输出的key/value对写入到HDFS文件中。

因为上面我们在讲解Mapper任务和Reducer任务的过程中,很多阶段都出现了key/value对,很容易混淆,下面是对key/value对进行了编号,方便大家对key/value对的变化情况的理解。如下图7所示:

MapReduce+Shuffle详解

图7

上图中,对于Mapper任务的输入的key/value对,定义为key1和value1。在map方法中处理后,输出的key/value对,定义为key2和value2。reduce方法接收key2和value2,处理后,输出key3和value3。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这