ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark计算引擎,资源调度,任务调度,累加器,广播变量

2022-07-14 22:36:38  阅读:217  来源: 互联网

标签:task String val 算子 累加器 Executor spark 任务调度


Spark

关键词:spark计算引擎,资源调度(申请资源),任务调度(执行task)

累加器,广播变量。

spark计算引擎,资源调度(申请资源),任务调度(执行task)

注:此此流程使用 yarn-client 模式

    1-7 为资源调度(申请资源)
1在本地启动Driver程序
2.向RM申请启动AM
3. AM随机分配一个节点启动AM 
4.启动AM
5.AM向RM申请启动Executor
6.AM分配一批节点启动Executor
7.Executor反向注册给Driver端
      8-最后为任务调度
8.当代码中遇到一个action算子时,开始执行调度任务
9.构建DAG有向无环图
10.DAGSheduler构建宽窄依赖,将DAG有向无环图切分成多个Stage,Stage: 是一组可以并行计算的task
11.将stage以taskSet的形式发送给TaskScheuler
12.TaskScheduler将TaskSet的任务task发送到Executor中去执行。会尽量将task发送到数据所在的节点执行
13.发送task任务到Executor执行

其中还涉及到两个机制

重试机制:

1.如果task执行失败时TaskScheduler重试3次

2.如果还是失败DAGScheduler重试Stage4次

推测机制:

如果spark发现有task执行的很慢,会在发送一个一样的task去竞争

此图很重要

累加器

无累加器时

存在累加器

代码实现

package com.core.day3
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object Demo21Accumulator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("Demo21Accumulator")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    var count = 0

    studentsRDD.foreach(stu => {

      count += 1

      //在算子内部对算子外的一个普通变量进行累加,在算子外面读不到累加的结果
      //因为算子内的代码运行在Executor,算子外面的代码云行在Driver端
      //算子内的变量只是算子外面的一个副本

      //println(s"里面的:$count")
    })
    println(s"外面的:$count")

    /**
     * 累加器
     *
     */
    //1.定义累加器
    val accumulator: LongAccumulator = sc.longAccumulator

    val mapRDD: RDD[String] = studentsRDD.map(stu =>{
      accumulator.add(1)
      stu
    })

    mapRDD.foreach(println)

    println(s"accumulator:${accumulator.value}")
  }
}

广播变量

代码实现

package com.core.day3

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.metrics.source
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.io.Source

//noinspection SourceNotClosed
object Demo23Broadcast {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()

    conf.setMaster("local")

    conf.setAppName("wc")

    val sc = new SparkContext(conf)

    /**
     * 广播变量
     * 当在算子内使用算子外的一个比较大的变量时,可以将这个变量广播出去,可以减少变量的副本数
     *
     */

    //读取学生表,以学号为key 构建一个map集合
    val studentMap: Map[String, String] = Source
      .fromFile("data/students.txt")
      .getLines()
      .toList
      .map(stu => {
        val id: String = stu.split(",")(0)
        (id,stu)
      }).toMap

    val scoresRDD: RDD[String] = sc.textFile("data/score.txt", 10)

    println(s"scoresRDD:${scoresRDD.getNumPartitions}")

    /**
     * 关联学生表和分数表
     * 循环分数表,使用学号到学生表的mao集合中查询学生的信息
     *
     */

    /**
     * 将Driver端的一个普通变量广播到Executor端
     *
     */
    val broadCastMap: Broadcast[Map[String, String]] = sc.broadcast(studentMap)

    val joinRDD: RDD[(String, String)] = scoresRDD.map(sco => {
      val id: String = sco.split(",")(0)

      //使用学号到学生表中获取学生的信息

      /**
       * 在算子内使用广播变量
       * 1、当第一个task在执行过程中如果使用了广播变量,会向Executor获取广播变量
       * 2、如果Executor中没有这个广播变量,Executor会去Driver端获取
       * 3、如果下一个task再使用到这个广播变量就可以直接用了
       *
       */
      //在算子内获取广播变量
      val map: Map[String, String] = broadCastMap.value
      val studentInfo: String = map.getOrElse(id, "默认值")

      (sco, studentInfo)
    })


    joinRDD.foreach(println)
  }
}

Executor 不仅有线程池,还有blockManager

标签:task,String,val,算子,累加器,Executor,spark,任务调度
来源: https://www.cnblogs.com/atao-BigData/p/16479583.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有