stage的划分是以shuffle操作作为边界的,遇到一个宽依赖就分一个stage
一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分在RDD的论文中有详细的介绍,简单的说是以shuffle和result这两种类型来划分。在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage.
会根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中
举例如下:
scala> import java.net.URL import java.net.URL
scala> val weblogrdd=sc.textFile("hdfs://localhost:9000/spark/log/web.log")
weblogrdd: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/spark/log/web.log MapPartitionsRDD[99] at textFile at
scala> val bb=weblogrdd.map(_.split(" ")).map(x=>{val url=new URL(x(1));val path=url.getPath().substring(1);(path,x(0))}).map((_,1))
bb: org.apache.spark.rdd.RDD[((String, String), Int)] = MapPartitionsRDD[104] at map at
scala> val cc=bb.reduceByKey(_+_)
cc: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[105] at reduceByKey at
scala> val dd=cc.groupBy(_._1._1).mapValues(_.toList.sortBy(_._2).reverse.take(2))
dd: org.apache.spark.rdd.RDD[(String, List[((String, String), Int)])] = MapPartitionsRDD[108] at mapValues at
scala> dd.collect
res43: Array[(String, List[((String, String), Int)])] = Array((car,List(((car,a10002),5), ((car,10001),1))), (movie,List(((movie,a10001),5), ((movie,a10002),2))), (book,List(((book,a10001),3), ((book,a10002),1))), (music,List(((music,a10001),2), ((music,a10002),1))), (yule,List(((yule,a10002),4), ((yule,a10001),2))))
spark中stage划分和提交的具体流程,其核心思想在于宽依赖划分stage 以及递归提交stage任务
------------------------------------------------------------------------------------------------------------------------------------------
scala> val mm=sc.makeRDD(List(("wang",2),("zhang",20),("wang",52)))
mm: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[118] at makeRDD at
scala> val nn=sc.makeRDD(List(("wang",31),("zhang",25),("wang",88)))
nn: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[119] at makeRDD at
scala> val mn=mm.join(nn)
mn: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[122] at join at
scala> mn.collect
res46: Array[(String, (Int, Int))] = Array((zhang,(20,25)), (wang,(2,31)), (wang,(2,88)), (wang,(52,31)), (wang,(52,88)))
--------------------------------------------------------------------------------------------
scala> val mm=sc.makeRDD(List(("wang",2),("zhang",20),("wang",52)))
mm: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[128] at makeRDD at
scala> val nn=sc.makeRDD(List(("wang",31),("zhang",25),("wang",88)))
nn: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[129] at makeRDD at
scala> val gmm=mm.groupByKey()
gmm: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[130] at groupByKey at
scala> val gnn=nn.groupByKey()
gnn: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[131] at groupByKey at
scala> val gmn=gmm join gnn
gmn: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[134] at join at
scala> gmn.collect
res51: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((zhang,(CompactBuffer(20),CompactBuffer(25))), (wang,(CompactBuffer(2, 52),CompactBuffer(31, 88))))