ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

图计算: 使用 Spark Graphx Pregel API 处理分层数据

2021-11-25 16:59:43  阅读:220  来源: 互联网

标签:String Pregel Int value sourceVertex API ._ Spark Any


今天,分布式计算引擎是许多分析、批处理和流应用程序的支柱。Spark提供了许多开箱即用的高级功能(pivot、分析窗口函数等)来转换数据。有时需要处理分层数据或执行分层计算。许多数据库供应商提供诸如“递归 CTE(公用表达式)”或“join” SQL 子句之类的功能来查询/转换分层数据。CTE 也称为递归查询或父子查询。在这篇文章中,我们将看看如何使用 Spark 解决这个问题。

分层数据概述 –

存在分层关系,其中一项数据是另一项的父项。分层数据可以使用图形属性对象模型表示,其中每一行都是一个顶点(节点),连接是连接顶点的边(关系),列是顶点的属性。

顶点和边

一些用例

  • 财务计算 - 子账户一直累积到父账户直至最高账户
  • 创建组织层次结构 - 经理与路径的员工关系
  • 使用路径生成网页之间的链接图
  • 任何类型的涉及链接数据的迭代计算

挑战

在分布式系统中查询分层数据有一些挑战

数据是连接的,但它分布在分区和节点之间。解决这个问题的实现应该针对执行迭代和根据需要移动数据(shuffle)进行优化。
图的深度会随着时间的推移而变化——解决方案应该处理不同的深度,并且不应该强制用户在处理之前定义它。

解决方案

在 spark 中实现 CTE 的方法之一是使用Graphx Pregel API。

什么是 Graphx Pregel API?

Graphx 是用于图形和图形并行计算的 Spark API。图算法本质上是迭代的,顶点的属性取决于它们直接或间接(通过其他顶点连接)连接顶点的属性。Pregel 是由 Google 和 spark graphX 开发的以顶点为中心的图处理模型,它提供了 pregel api 的优化变体。

Pregel API 如何工作?

Pregel API 处理包括执行超级步骤

步骤 0:

将初始消息传递给所有顶点
将值作为消息发送到其直接连接的顶点

步骤 1:

接收来自前面步骤的消息
改变值
将值作为消息发送到其直接连接的顶点
重复 步骤 1 直​​到有消息传递,当没有更多消息传递时停止。

用例的分层数据

下表显示了我们将用于生成自上而下的层次结构的示例员工数据。这里员工的经理由具有 emp_id 值的 mgr_id 字段表示。
测试表
添加以下列作为处理的一部分

Level (Depth)顶点在层次结构中所处的级别
Path层次结构中从最顶层顶点到当前顶点的路径
Root层次结构中最顶层的顶点,当数据集中存在多个层次结构时很有用
Iscyclic如果有坏数据,存在循环关系,然后标记它
Isleaf如果顶点没有父节点,则标记它

代码

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.hashing.MurmurHash3

/**
 * Pregel API
 * @author zyh
 */
object PregelTest {

  // The code below demonstrates use of Graphx Pregel API - Scala 2.11+
  // functions to build the top down hierarchy

  //setup & call the pregel api
  //设置并调用pregel api
  def calcTopLevelHierarcy(vertexDF: DataFrame, edgeDF: DataFrame): RDD[(Any,(Int,Any,String,Int,Int))] = {

    // create the vertex RDD
    // primary key, root, path
    val verticesRDD: RDD[(VertexId, (Any, Any, String))] = vertexDF
      .rdd
      .map{x=> (x.get(0),x.get(1) , x.get(2))}
      .map{ x => (MurmurHash3.stringHash(x._1.toString).toLong, ( x._1.asInstanceOf[Any], x._2.asInstanceOf[Any] , x._3.asInstanceOf[String]) ) }

    // create the edge RDD
    // top down relationship
    val EdgesRDD = edgeDF
      .rdd
      .map{x=> (x.get(0),x.get(1))}
      .map{ x => Edge(MurmurHash3.stringHash(x._1.toString).toLong, MurmurHash3.stringHash(x._2.toString).toLong,"topdown" )}

    // create graph
    val graph = Graph(verticesRDD, EdgesRDD).cache()

    val pathSeperator = """/"""

    // 初始化消息
    // initialize id,level,root,path,iscyclic, isleaf
    val initialMsg = (0L,0,0.asInstanceOf[Any], List("dummy"),0,1)

    // add more dummy attributes to the vertices - id, level, root, path, isCyclic, existing value of current vertex to build path, isleaf, pk
    val initialGraph = graph.mapVertices((id, v) => (id, 0, v._2, List(v._3), 0, v._3, 1, v._1) )

    val hrchyRDD = initialGraph.pregel(
      initialMsg,
      Int.MaxValue,            // 迭代次数, 设置成当前表示无限迭代下去
      EdgeDirection.Out)(
      setMsg,
      sendMsg,
      mergeMsg)
    
    // build the path from the list
    val hrchyOutRDD = hrchyRDD.vertices.map{case(id,v) => (v._8,(v._2,v._3,pathSeperator + v._4.reverse.mkString(pathSeperator),v._5, v._7 )) }

    hrchyOutRDD
  }

  //改变顶点的值
  def setMsg(vertexId: VertexId, value: (Long,Int,Any,List[String], Int,String,Int,Any), message: (Long,Int, Any,List[String],Int,Int)): (Long,Int, Any,List[String],Int,String,Int,Any) = {

    // 第一次收到的消息是初始化的消息 initialMsg
    println(s"设置值: $value  收到消息:  $message")

    if (message._2 < 1) { //superstep 0 - initialize
      (value._1,value._2+1,value._3,value._4,value._5,value._6,value._7,value._8)
    }
    else if ( message._5 == 1) { // set isCyclic (判断是不是一个环)
      (value._1, value._2, value._3, value._4, message._5, value._6, value._7,value._8)
    } else if ( message._6 == 0 ) { // set isleaf
      (value._1, value._2, value._3, value._4, value._5, value._6, message._6,value._8)
    }
    else { // set new values
      //( message._1,value._2+1, value._3, value._6 :: message._4 , value._5,value._6,value._7,value._8)

      ( message._1,value._2+1, message._3, value._6 :: message._4 , value._5,value._6,value._7,value._8)
    }
  }



  // 将值发送到顶点
  def sendMsg(triplet: EdgeTriplet[(Long,Int,Any,List[String],Int,String,Int,Any), _]): Iterator[(VertexId, (Long,Int,Any,List[String],Int,Int))] = {

    val sourceVertex: (VertexId, Int, Any, List[String], Int, String, Int, Any) = triplet.srcAttr
    val destinationVertex: (VertexId, Int, Any, List[String], Int, String, Int, Any) = triplet.dstAttr

    println(s" 源头: $sourceVertex   目的地:   $destinationVertex")

    // 检查是不是一个死环, 就是 a是b的领导, b是a的领导
    // check for icyclic
    if (sourceVertex._1 == triplet.dstId || sourceVertex._1 == destinationVertex._1) {

      println(s"存在死环    源头: ${sourceVertex._1}        目的地:  ${triplet.dstId}")

      if (destinationVertex._5 == 0) { //set iscyclic
        Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 1, sourceVertex._7)))
      } else {
        Iterator.empty
      }
    }
    else {

      // 判断是不是叶子节点,就是没有子节点的节点,属于叶子节点,根节点不算 ,所以样例数据中的叶子节点是 3,8,10
      if (sourceVertex._7==1) //is NOT leaf
      {
        Iterator((triplet.srcId, (sourceVertex._1,sourceVertex._2,sourceVertex._3, sourceVertex._4 ,0, 0 )))
      }
      else { // set new values
        Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 0, 1)))
      }
    }
  }



  // 从所有连接的顶点接收值
  def mergeMsg(msg1: (Long,Int,Any,List[String],Int,Int), msg2: (Long,Int, Any,List[String],Int,Int)): (Long,Int,Any,List[String],Int,Int) = {

    println(s"合并值:   $msg1     $msg2")

    // dummy logic not applicable to the data in this usecase
    msg2
  }


  // Test with some sample data
  def main(args: Array[String]): Unit = {

    // 屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)


    val spark: SparkSession = SparkSession
      .builder
      .appName(s"${this.getClass.getSimpleName}")
      .master("local[1]")
      .getOrCreate()

    val sc = spark.sparkContext

    // RDD 转 DF, 隐式转换
    import spark.implicits._

    val empData = Array(

      // 测试没有顶级的父节点,会出现空指针异常,构建图的时候,会根据边生成一个为null的顶点
      ("EMP001", "Bob", "Baker", "CEO", null.asInstanceOf[String])
      , ("EMP002", "Jim", "Lake", "CIO", "EMP001")
      , ("EMP003", "Tim", "Gorab", "MGR", "EMP002")
      , ("EMP004", "Rick", "Summer", "MGR", "EMP002")
      , ("EMP005", "Sam", "Cap", "Lead", "EMP004")
      , ("EMP006", "Ron", "Hubb", "Sr.Dev", "EMP005")
      , ("EMP007", "Cathy", "Watson", "Dev", "EMP006")
      , ("EMP008", "Samantha", "Lion", "Dev", "EMP007")
      , ("EMP009", "Jimmy", "Copper", "Dev", "EMP007")
      , ("EMP010", "Shon", "Taylor", "Intern", "EMP009")
      // 空指针和顶点数据重复没有关系
      // 空指针和父节点在顶点中找不到有关系 (父顶点为null没有关系,需要父顶点能够在顶点列表中能找到)
      , ("EMP011", "zhang", "xiaoming", "CTO", null)
    )

    // create dataframe with some partitions
    val empDF = sc.parallelize(empData, 3)
      .toDF("emp_id","first_name","last_name","title","mgr_id")
      .cache()

    // primary key , root, path - dataframe to graphx for vertices
    val empVertexDF = empDF.selectExpr("emp_id","concat(first_name,' ',last_name)","concat(last_name,' ',first_name)")

    // parent to child - dataframe to graphx for edges
    val empEdgeDF = empDF.selectExpr("mgr_id","emp_id").filter("mgr_id is not null")

    // call the function
    val empHirearchyExtDF: DataFrame = calcTopLevelHierarcy(empVertexDF,empEdgeDF)
      .map{ case(pk,(level,root,path,iscyclic,isleaf)) => (pk.asInstanceOf[String],level,root.asInstanceOf[String],path,iscyclic,isleaf)}
      .toDF("emp_id_pk","level","root","path","iscyclic","isleaf").cache()

    // extend original table with new columns
    val empHirearchyDF = empHirearchyExtDF.join(empDF , empDF.col("emp_id") === empHirearchyExtDF.col("emp_id_pk"))
      .selectExpr(
        "emp_id","first_name","last_name",
        "title","mgr_id",
        "level",
        "root",
        "path",
        "iscyclic","isleaf"
      )

    // print
    empHirearchyDF.show()

  }
}

输出

输出结果

任务执行

Spark 作业分解为作业、阶段和任务。由于其迭代性质,Pregel API 在内部生成多个作业。每次将消息传递到顶点时都会生成一个作业。由于数据可能位于不同的节点上,因此每个作业可能会以多次 shuffle 结束。

需要注意的是在处理大型数据集时创建的长 RDD 谱系。
执行流程

概括

Graphx Pregel API 非常强大,可用于解决迭代问题或任何图形计算。

标签:String,Pregel,Int,value,sourceVertex,API,._,Spark,Any
来源: https://blog.csdn.net/qq_36039236/article/details/121541138

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有