Spark系列 (七)SparkGraphX下的Pregel方法

Stella981
• 阅读 855

文章目录

  • Pregel框架:

  • 一:Spark GraphX Pregel:

  • 二:Pregel计算过程:

  • Pregel函数源码及各个参数解析:

  • 三:案例:单源最短路径

  • 第一步:调用pregel方法:

  • 第二步:第一次迭代:

  • 第三步:第二次迭代:

  • 第四步:不断迭代,直至所有顶点处于钝化态

  • 案例代码如下:

Pregel框架:

一:Spark GraphX Pregel:

  • Pregel是google提出的用于大规模分布式图计算框架
    • 图遍历(bfs)
    • 单源最短路径(sssp)
    • pageRank计算
  • Pregel的计算有一系列迭代组成
  • Pregel迭代过程
    • 每个顶点从上一个superstep接收入站消息
    • 计算顶点新的属性
    • 在下一个superstep中向相邻的顶点发送消息
    • 当没有剩余消息时,迭代结束

二:Pregel计算过程:

Pregel函数源码及各个参数解析:

def pregel[A: ClassTag](
    // 图节点的初始信息
      initialMsg: A,
    // 最大迭代次数
      maxIterations: Int = Int.MaxValue,
    // 
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }

参数

说明

initialMsg

图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息

maxIterations

最大迭代次数

activeDirection

规定了发送消息的方向

vprog

节点调用该消息将聚合后的数据和本节点进行属性的合并

sendMsg

激活态的节点调用该方法发送消息

mergeMsg

如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数

三:案例:单源最短路径

首先要清楚关于 顶点 的两点知识:

  1. 顶点 的状态有两种:
    (1)、钝化态【类似于休眠,不做任何事】
    (2)、激活态【干活】

  2. 顶点 能够处于激活态需要有条件:
    (1)、成功收到消息 或者
    (2)、成功发送了任何一条消息

第一步:调用pregel方法:

从5出发,除自身顶点外所有顶点都将接收一条初始消息initialMsg,使所有顶点处于激活态,并将属性改成无穷大。自身顶点为0。

第二步:第一次迭代:

所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。

之后只有两条边的信息发送成功了

5—>3(0+8<Double.Infinity , 成功),
5—>6(0+3<Double.Infinity , 成功)

此时只有5,3,6处于激活态了,3,6调用vprog方法,将属性合并。

第三步:第二次迭代:

处于激活态的3,6调用sendMsg方法发送消息。

最后只有3—>2(8+4<Double.Infinity,成功)

此时只有3,2处于激活态,2调用vprog方法将属性合并。

第四步:不断迭代,直至所有顶点处于钝化态

每个顶点的属性,就是顶点5到达各个顶点的最短距离。

案例代码如下:

package com.wyw
  import org.apache.spark.{SparkConf, SparkContext}
  import org.apache.spark.graphx._
  import org.apache.spark.rdd.RDD
object Pregel {

    //1、创建SparkContext
    val sparkConf = new SparkConf().setAppName("GraphxHelloWorld").setMaster("local[*]")
    val sparkContext = new SparkContext(sparkConf)

    //2、创建顶点
    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50))
    )
    val vertexRDD: RDD[(VertexId, (String,Int))] = sparkContext.makeRDD(vertexArray)

    //3、创建边,边的属性代表 相邻两个顶点之间的距离
    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(2L, 5L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )
    val edgeRDD: RDD[Edge[Int]] = sparkContext.makeRDD(edgeArray)


    //4、创建图(使用aply方式创建)
    val graph1 = Graph(vertexRDD, edgeRDD)

    /* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** */

    //被计算的图中 起始顶点id,初始化把点属性全部换成正无穷
    val srcVertexId = 5L
    val initialGraph = graph1.mapVertices{
      case (vid,(name,age)) =>
        if (vid==srcVertexId)
          0.0
        else
          Double.PositiveInfinity
    }

    //5、调用pregel柯里化函数
    val pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(
      Double.PositiveInfinity,
      Int.MaxValue,
      EdgeDirection.Out
    )(
      // 传三个匿名函数参数
      // 我收到消息后与本节点判断
      (vid: VertexId, vd: Double, distMsg: Double) => {
        // 比较两者值
        val minDist = math.min(vd, distMsg)
        println(s"顶点$vid,属性$vd,收到消息$distMsg,合并后的属性$minDist")
        // 把小数据发送出去
        minDist
      },
      // 是不是要向下个点发数据
      (edgeTriplet: EdgeTriplet[Double,PartitionID]) => {
        // 检查起点+权重的值 和终点的值判断,小于才发送
        if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {
          println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")

          Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
        } else {
          Iterator.empty
        }
      },
      // 多个消息进行判断,取最小的消息发送,每次都处理2个
      (msg1: Double, msg2: Double) => math.min(msg1, msg2)
    )

    //6、输出结果
    //  pregelGraph.triplets.collect().foreach(println)
    //  println(pregelGraph.vertices.collect.mkString("\n"))

    //7、关闭SparkContext
    sparkContext.stop()
    
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Stella981 Stella981
3年前
OpenCV访问像素点
三种方法迭代器创建一个Mat::Iterator对象it,通过itMat::begin()来的到迭代首地址,递增迭代器知道itMat::end()结束迭代;while(it!Scr.end<Vec3b()){//(it)00;//蓝色通道置零;
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这