ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark基础篇-Spark-Core核心模型

2021-05-05 22:33:23  阅读:179  来源: 互联网

标签:Core val Int 分区 RDD Spark 模型 def partitions


第二章 Spark-Core核心模型

1.RDD

弹性分布式数据集(Resilient Distributed Dataset)是Spark中最基本的数据抽象。

  • 不可变(只读)

  • 可分区

  • 可并行计算

  • 自动容错

  • 位置感知性调度

RDD是Spark的核心抽象模型,本质上是一个抽象类。RDD源代码部分重点代码实现如下:

abstract class RDD[T: ClassTag](
  @transient private var _sc: SparkContext,
  @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
  ......
   /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

  // =======================================================================
  // Methods and fields available on all RDDs
  // =======================================================================

  /** The SparkContext that created this RDD. */
  def sparkContext: SparkContext = sc

  /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()
  ......
}

RDD有五个属性,用来描述数据集的状态。

  • partitions 数据分区(抽象)
  • compute 对分区计算的函数(抽象)
  • dependences 依赖列表(抽象)
  • partitioner 分区方式(具体)
  • preferredLocations 优选位置(具体)

注意,RDD的具体实现类中必须重写前三个属性。

思考1,RDD的具体实现类中后两个属性需要重写吗?

RDD的具体实现类如下:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

注意,虽然RDD的实现类很多,但只需要掌握抽象RDD中的五个重要属性即可。

2.数据分区

站在数据的角度思考RDD,RDD是由数据分区(partition)组成,这些分区运行在集群中的不同节点上。

  • 一个RDD可以包含多个分区
  • 一个分区就是一个dataset片段
  • 一个分区会被封装成一个Task

RDD内部数据组成如图:
在这里插入图片描述

数据分区源码实现如下:

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int
  // A better default implementation of HashCode
  override def hashCode(): Int = index
  override def equals(other: Any): Boolean = super.equals(other)
}

思考2,数据分区内部存储数据吗?RDD存储真正的数据吗?

数据分区内部并不会存储具体的数据。

  1. Partition类内包含一个index成员,表示该分区在 RDD内的编号;
  2. 通过RDD编号+分区编号可以唯一确定该分区对应的块编号;
  3. 利用底层数据存储层提供的接口;
  4. 就可以从存储介质(如:HDFS、Memory)中提取出分区对应的数据。

2.1构建RDD

2.1.1读取外部数据集

  • 文本文件

sc.textFile(path[,minPartitions])
sc.wholeTextFiles(path[,minPartitions])

  • 字节文件

binaryFiles(path[,minPartitions])

  • 对象文件

sc.objectFile[T](path)

  • SequenceFile

sc.sequenceFile(path,keyClass,valueClass[,minPartitions])
sc.sequenceFile[K,V](path[,minPartitions])

  • Hadoop输入输出格式

sc.newAPIHadoopFile[Text,Text,KeyValueTextInputFormat](path)
sc.newAPIHadoopFile[F](path)

说明:

  • path可以是文件也可以是目录,也可以是带有匹配符号的路径。

  • minPartitions是指用户未给定时HadoopRDD的默认最小分区数。注意,我们使用math.min所以"defaultMinPartitions"不能大于2。

  • keyClass、valueClass是指数据文件中key、value的数据类型。

在源码中各个方法定义如下:

//1.文本文件
def textFile(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
             minPartitions).map(pair => pair._2.toString).setName(path)
}

def wholeTextFiles(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
  /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
  //....
}
//2.字节文件
def binaryFiles(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
  //...
}
//3.对象文件
def objectFile[T: ClassTag](
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[T] = withScope {
  assertNotStopped()
  sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
  .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}
//4.sequence文件
def sequenceFile[K, V](path: String,
                       keyClass: Class[K],
                       valueClass: Class[V],
                       minPartitions: Int
                      ): RDD[(K, V)] = withScope {
  assertNotStopped()
  val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
  hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}

def sequenceFile[K, V](
  path: String,
  keyClass: Class[K],
  valueClass: Class[V]): RDD[(K, V)] = withScope {
  assertNotStopped()
  sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
 kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
  withScope {
    assertNotStopped()
    val kc = clean(kcf)()
    val vc = clean(vcf)()
    val format = classOf[SequenceFileInputFormat[Writable, Writable]]
    val writables = hadoopFile(path, format,
                               kc.writableClass(km).asInstanceOf[Class[Writable]],
                               vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
    writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
  }
}
//5.newAPIHadoop文件
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
  newAPIHadoopFile(
    path,
    fm.runtimeClass.asInstanceOf[Class[F]],
    km.runtimeClass.asInstanceOf[Class[K]],
    vm.runtimeClass.asInstanceOf[Class[V]])
}
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
  path: String,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V],
  conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
  assertNotStopped()
  //...
}

def defaultParallelism: Int = {
  assertNotStopped()
  taskScheduler.defaultParallelism
}
/**
   * Default min number of partitions for Hadoop RDDs when not given by user
   * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
   * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
   */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

2.1.2 对一个Seq并行化

sc.makeRDD(seq[,numPartition])

sc.parallelize(seq[,numPartition])

说明:

  • seq是指Seq集合
  • numPartition是指分区个数(并行度)

在源码中makeRDDparallelize方法定义如下:

def makeRDD[T: ClassTag](
  seq: Seq[T],
  numSlices: Int = defaultParallelism): RDD[T] = withScope {
  parallelize(seq, numSlices)
}
def parallelize[T: ClassTag](
  seq: Seq[T],
  numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

2.1.3案例

  1. 分别使用textFilewholeTextFiles方法读取/opt/spark/README.md文件。
object RDDTest1{
  def main(args:Array[String])={
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("案例1")
    //2.获取SparkContext对象
    val sc=new SparkContext(conf);
    sc.setLogLevel("warn")
    
    //3.构建RDD
    
    //3.1使用textFile读取文本文件
    val rdd1=sc.textFile("/opt/spark/README.md")
    println(rdd1.count)
    println(rdd1.first)
    
    //3.2使用wholeTextFiles读取文本文件
    val rdd2=sc.wholeTextFiles("/opt/spark/README.md")
    println(rdd2.count)
    println(rdd2.first)
    
    //5.关闭SparkContext对象
    sc.stop()
  }
}

观察,rdd1和rdd2调用相同方法输出有什么不同?

  1. 通过对Seq集合并行化构建RDD
object RDDTest2{
  def main(args:Array[String])={
    
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("案例2")
    //2.获取SparkContext对象
    val sc=new SparkContext(conf);
    sc.setLogLevel("warn")
    
    //3.构建RDD
    //使用makeRDD将seq集合并行化
    val seq=1 to 10
    val rdd1=sc.makeRDD(seq)
    
    //4.1调用map方法将rdd1中每个元素+1,返回一个新的RDD
    val rdd2=rdd1.map(x=>x+1)
    //4.2将rdd2中的元素输出到控制台
    rdd2.foreach(println)
    
    //5.关闭SparkContext对象
    sc.stop()
    
  }
}

注意,通过以上两个案例的编写,发现每次都需要构建SparkContext对象,且代码基本一致。

思考3,是否可以封装SparkContext对象的构建过程?如果可以的话,如何实现?

借助于Scala里面的包对象实现封装:

package com

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark使用工具类
  * 方便构建使用Spark
  * */
package object briup {

  private var _conf:Option[SparkConf]=None;
  private var _sc:Option[SparkContext]=None;
  private var _spark:Option[SparkSession]=None;
  private var _ssc:Option[StreamingContext]=None;
  implicit val jarFilePath:Option[String]=None;

  /**
  	*
  	* 获取SparkConf对象
  	**/
  private def getConf(master:String,appName:String,checkPoint:String="spark-checkpoint"):SparkConf={
    _conf match{
      case Some(conf) => conf
      case None =>
        val conf=new SparkConf()
        conf.setMaster(master)
        conf.setAppName(appName)
        conf.set("spark.sql.streaming.checkpointLocation",checkPoint)
        _conf=Some(conf)
        conf
    }
  }
  
  /**
    * 获取SparkContext对象
    * */
  def getSparkContext(master:String,appName:String)(implicit jarFilePath:Option[String]=None):SparkContext={
    _sc match{
      case Some(sc) => sc
      case None =>
        val conf=getConf(master,appName)
        //第一种构建方式
        //    val sc=new SparkContext(conf);
        //第二种构建方式
        val sc=SparkContext.getOrCreate(conf);
      
        jarFilePath match {
          case Some(filepath) => sc.addJar(filepath)
          case None =>
        }
        _sc=Some(sc)
      	sc.setLogLevel("warn")
        sc
    }
  }
  
  /**
    * 获取SparkSession对象
    * */
  def getSpark(master:String,appName:String,checkPoint:String="spark-checkpoint")(implicit jarFilePath:Option[String]):SparkSession={
    _spark match{
      case Some(spark) =>
        //        println("...获取已经存在的Spark...")
        spark
      case None =>
        //        println("...开始创建Spark...")
        val conf=getConf(master,appName)
        val spark=SparkSession.builder().config(conf).getOrCreate();
        jarFilePath match {
          case Some(filepath) => spark.sparkContext.addJar(filepath)
          case None => //println("无jarFilePath......");
        }
        _spark=Some(spark)
        spark
    }
  }
  
  /**
    * 获取StreamingContext对象
    * */
  def getStreamingSpark(master:String,appName:String,batchDur:Duration)(implicit jarFilePath:Option[String]=None):StreamingContext={
    _ssc match{
      case Some(ssc) =>ssc
      case None =>
        val conf=getConf(master,appName)
        val ssc=new StreamingContext(conf,batchDur)
        jarFilePath match {
          case Some(filepath) => ssc.sparkContext.addJar(filepath)
          case None =>  //println("无jarFilePath......");
        }
        _ssc=Some(ssc)
        ssc
    }
  }
}

  1. 操作非文本数据文件

数据目录:hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat

数据说明:用户ID::性别::年龄::职业代码::邮编

编码实现:

object RDDTest3{
  def main(args:Array[String])={
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("案例3-各种数据练习")
    //2.获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)
    
    //3.获取RDD+4.RDD操作
    
    //1.读取文本文件构建RDD
    val rdd=sc.textFile("hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat")
    //2.输出到控制台
    rdd.foreach(println)
    //3.保存为对象文件 注意,该参数为路径名称
    rdd.saveAsObjectFile("users_obj")
    //4.读取对象文件
    val objectRDD=sc.objectFile[String]("users_obj")
    //5.输出到控制台
    objectRDD.foreach(println)
    //6.读取字节文件
    val binaryRDD=sc.binaryFiles("hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat")
    //8.输出到控制台
    binaryRDD.foreach(println)
    //9.保存为Sequence文件
    objectRDD.map(str=>(str.length,str)).saveAsSequenceFile("users_seq")
    //10.读取Sequence文件
    val seqRDD=sc.sequenceFile[Int,String]("users_seq")
    //11.输出到控制台
    seqRDD.foreach(println)
    //12.保存为Hadoop格式的文件
    seqRDD.map(tu=>(new IntWritable(tu._1),new Text(tu._2))).saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable,Text]]("users_hadoop")
    //13.读取Hadoop格式的文件
    val hadoopRDD=sc.newAPIHadoopFile[IntWritable,Text,SequenceFileInputFormat[IntWritable,Text]]("users_hadoop")
    //14.输出到控制台
    hadoopRDD.foreach(println)
    
    //5.关闭SparkContext对象
    sc.stop()
    
  }
}

思考4,学会了如何构建RDD,如何查看RDD中的分区个数?

2.2分区个数

获取分区个数:rdd对象.getNumPartitions

演示:

object RDDTest2{
  def main(args:Array[String])={
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("演示1-分区个数")
    //2.获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)

    
    //3.构建RDD
    val rdd=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
    //3.1查看RDD的分区个数
    println(rdd.getNumPartitions)
    
    //5.关闭SparkContext对象
    sc.stop()
  }
}

注意,分区个数会决定Stage中Task的个数,分区个数是Spark任务调度中的并行度。

思考5,如何设置RDD的分区个数?

  • 获取RDD时指定
    • 读取外部文件时,可选参数minPartitions
      • 不指定时为math.min(Spark任务调度中的并行度,2)
      • minPartitions是指用户未给定时HadoopRDD的默认最小分区数
    • 并行Seq集合时,可选参数numPartition
      • 不指定时为Spark任务调度中的并行度
      • numPartition是指分区数
  • 重分区(调整RDD的分区个数)
    • rdd.repartition(numPartitions:Int)
    • rdd.coalesce(numPartitions:Int,shuffle:Boolean)
    • pairRdd.repartitionAndSortWithinPartitions(partitioner: Partitioner)

重分区的方法源码如下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

def coalesce(numPartitions: Int, shuffle: Boolean = false,
             partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
  if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
    val distributePartition = (index: Int, items: Iterator[T]) => {
      var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
      items.map { t =>
        // Note that the hash code of the key will just be the key itself. The HashPartitioner
        // will mod it with the number of total partitions.
        position = position + 1
        (position, t)
      }
    } : Iterator[(Int, T)]

    // include a shuffle step so that our upstream tasks are still distributed
    new CoalescedRDD(
      new ShuffledRDD[Int, T, T](
        mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
        new HashPartitioner(numPartitions)),
      numPartitions,
      partitionCoalescer).values
  } else {
    new CoalescedRDD(this, numPartitions, partitionCoalescer)
  }
}

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}

思考6,重分区一定需要通过网络混洗吗?

注意:

  • 对于repartitioncoalesace方法,Spark建议使用repartition的优化版coalesace
  • 如果重分区之后需要对分区内的数据进行排序,Spark建议使用repartitionAndSortWithinPartitions

演示:


object RDDTest2{
  def main(args:Array[String])={   
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("演示2-分区个数")
    //2.获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)
    
    //3.1构建RDD时指定分区个数
    val rdd1=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
    println(rdd1.getNumPartitions)
    
    val rdd2=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat",5)
    println(rdd2.getNumPartitions)
    
    val rdd3=sc.makeRDD(1 to 10)
    println(rdd3.getNumPartitions)
    
    val rdd3=sc.makeRDD(1 to 10,4)
    println(rdd3.getNumPartitions)
    
    
    //3.2重分区调整分区个数
    println(s"重分区 repartition 前:分区个数:${rdd1.getNumPartitions}")
    val re_rdd1=rdd1.repartition(3)
    println(s"重分区 repartition 后:分区个数:${re_rdd1.getNumPartitions}")
    
    println("-----调小分区个数-------")
    println(s"重分区 coalesce 前:分区个数:${rdd2.getNumPartitions}")
    val co_rdd2=rdd2.coalesce(3)
    println(s"重分区 coalesce 后:分区个数:${co_rdd2.getNumPartitions}")
    
    println("-----调大分区个数-------")
    println(s"重分区 coalesce 前:分区个数:${rdd2.getNumPartitions}")
    val co_rdd2=rdd2.coalesce(10)
    println(s"重分区 coalesce 后:分区个数:${co_rdd2.getNumPartitions}")
    
    
    
     println(s"重分区 repartitionAndSortWithinPartitions 前:分区个数:${rdd3.getNumPartitions}")
    val partitioner=new HashPartitioner(5)
    val ras_rdd3=rdd3.map(x=>(x,1)).repartitionAndSortWithinPartitions(partitioner)
     println(s"重分区 repartitionAndSortWithinPartitions 后:分区个数:${ras_rdd3.getNumPartitions}")
    
       
    
    //5.关闭SparkContext对象
    sc.stop()
  }
}

思考7,coalesce方法中第二个参数的含义?coalesce优于repartition的原因?

原因:

  • repartition操作一定会产生Shuffle操作,网络开销大,性能降低。
  • coalesce针对调小分区个数可以不用产生Shuffle操作,故可以节省网络开销,提高效率。

思考8,coalesce针对调大分区个数一定要产生Shuffle操作吗?为什么?

原因:

  • 调小分区个数,可以让多个分区直接进行合并为一个大分区,可以不需要Shuffle。
  • 调大分区个数,需要将一个分区内的数据打乱分发到多个分区,必须借助于Shuffle。
    在这里插入图片描述

3.分区计算

Spark中RDD的计算是以分区为单位的,每个RDD都会实现compute函数以达到这个目的。

/**
  * :: DeveloperApi ::
  * Implemented by subclasses to compute a given partition.
  */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]

注意,该属性为开发者API,使用Spark编程人员不允许使用。

4.依赖列表

掌握依赖之前先了解下RDD操作。

4.1RDD操作

4.1.1转化算子

返回一个RDD的操作,延迟计算。

常见的转换算子如下:

  1. 基于元素进行操作
1map(func)将每个原元素经过func函数转换后返回一个新元素,组成一个由新元素组成一个新的分布式数据集
2flatMap(func)类似于 map, 但是每一个输入元素, 会被映射为 0 到多个输出元素(因此,func 函数的返回值是一个 Seq,而不是单一元素)
  1. 基于分区进行操作
1mapPartitions(func)类似于map,但在RDD的每个分区(块)上分别运行,因此在T类型的RDD上运行时,func必须是Iterator=>Iterator类型
2mapPartitionsWithIndex(func)与mapPartitions类似,但也为func提供了一个表示分区索引的整数值,因此在T类型的RDD上运行时,func必须是(Int,Iterator)=>Iterator类型
  1. 聚合操作
1reduceByKey([partitioner,]fun[,numPartitions])按key进行fun操作
2foldByKey(defaultValue[,numPartitions/partitioner])(fun)按key进行fun操作,可以设置默认值
3combineByKey[A](fun1,fun2,fun3)按key进行fun操作,fun1为分区内key第一次出现时调用该方法;fun2为分区内key不是第一次出现时调用;fun3为分区间key相同时调用
  1. 分组操作
1groupByKey([partitioner|numPartitions])按key进行分组,可以指定分区个数或指定分区方式
2groupBy(fun[numPartitions|Partitioner])按fun返回值进行分组,可以指定分区个数或指定分区方式
3groupWith(otherRDD*)在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为 CoGroup
  1. 连接操作
1join(other)连接
2rightOuterJoin(other)右外连接
3leftOuterJoin(other)左外连接
4fullOuterJoin(other)全连接
5cogroup(other)分组连接
6subtractByKey(other)求差
  1. 排序操作
1sortByKey([boolean])按key排序,默认为升序,boolean=false为降序,true为升序
2sortBy(fun,[boolean])按fun的返回值进行排序,默认为升序,boolean=false为降序,true为升序

4.1.2行动算子

返回一个结果或者写到文件系统中的操作,行动算子会触发转化算子进行计算。

常见的行动算子如下:

  1. 获取部分元素
1first获取第一个元素(类似于 take(1)
2max获取最大元素
3min获取最小元素
4top(num)按key进行降序排列,获取前num个元素,返回一个数组
5take(num)返回一个数组,由数据集的前 n 个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是 Driver 程序所在机器,单机计算所有的元素(Gateway 的内存压力会增大,需要谨慎使用)
6takeOrdered(n, [ordering])使用RDD的自然顺序或自定义比较器返回RDD的前n个元素。
7takeSample(withReplacement, num, [seed])返回一个数组,其中包含数据集num元素的随机样本,可以替换也可以不替换,还可以预先指定随机数生成器种子。
  1. 规约操作
1reduce(func)依次将元素根据func执行二元操作
2fold(defaulteValue)(func)在reduce基础上添加默认值
3aggregate[U] (defaulteValue) (func1,func2)分区内执行func1,分区间执行func2
  1. 输出到外部系统
1saveAsTextFile(path)保存为文本文件
2saveAsObjectFile(path)保存为对象文件
3saveAsSequenceFile(path) (Java and Scala)保存为SequenceFile
4saveAsHadoopFile[OutputFormat[Key,Value] (path)保存为HadoopFile
5foreach(func)遍历元素执行func
6foreachPartition(func)遍历分区执行func
  1. 其他操作
1reduce(func)通过函数 func 聚集数据集中的所有元素。Func 函数接受 2 个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
2collect()在 Driver 的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用 filter 或者其它操作后,返回一个足够小的数据子集再使用, 直接将整个 RDD 集 Collect 返回, 很可能会让 Driver程序 OOM
3count()返回数据集的元素个数
4countByValue对每个元素分别计数
5countByKey对每个键对应的元素分别计数
6collectAsMap将结果以映射表的形式返回
7lookup(key)返回给定键对应的所有值

注意,每当我们调用一个新的行动操作的时候,整个RDD都会从头开始计算。

4.1.3缓存操作

在执行多个查询操作时,可以将RDD缓存在内存中,后续的其他查询就可以重用RDD,来提升查询速度。

相关方法:

  • cache()
  • persist([持久化级别])
  • unpersist()

持久化级别

Storage LevelMeaning
MEMORY_ONLY将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,则某些分区将不会被缓存,并且每次需要时都会动态重新计算。这是默认级别。
MEMORY_AND_DISK将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,请将不适合的分区存储在磁盘上,并在需要时从那里读取它们。
MEMORY_ONLY_SER (Java and Scala)将RDD存储为序列化Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用[fast serializer]时(http://spark.apache.org/docs/latest/tuning.html),但更需要CPU来读取。
MEMORY_AND_DISK_SER (Java and Scala)类似于只使用内存的分区,但是将内存中不适合的分区溢出到磁盘,而不是在每次需要时动态地重新计算它们。
DISK_ONLY仅在磁盘上存储RDD分区。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.与上面的级别相同,但是在两个集群节点上复制每个分区。
OFF_HEAP (experimental)与内存类似,但将数据存储在[堆外内存](http://spark.apache.org/docs/latest/configuration.html#内存-管理)。这需要启用堆外内存。

注意,缓存操作属于转换算子,因此会延迟计算,直到遇到第一个行动算子才会触发缓存。

4.1.4案例

案例:统计出用户表中男女用户的人数。

数据目录:hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat

数据说明:用户ID::性别::年龄::职业代码::邮编

代码实现:

object MovieTest{
  def main(args:Array[String])={
    //获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("演示2-分区个数")
    //获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)
    
    //1.读取数据集
    val userRDD=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
    //2.将每行字符串按照::进行分隔,并获取分割之后的第二部分,即性别
    val mapRDD=userRDD.map(line=>line.split("::")(1))
    //3.将每个元素转化为二元元组,第一元为是每个元素,第二元为数字1
    val map2RDD=mapRDD.map(gender=>(gender,1))
    //4.将map2RDD中每个元素按照key进行分组求和,即获取男女用户的人数
    val resultRDD=map2RDD.reduceByKey(_+_);
    //val resultRDD=map2RDD.groupByKey.mapValues(value=>value.reduce(_+_));
    //5.将结果输出到控制台
    resultRDD.foreach(println)
    //6.将结果存储到文本文件中
    resultRDD.saveAsTextFile("userNumByGender_result_1")
    
    //关闭资源
    sc.stop()
    
  }
}

思考,如果要在上述代码中添加缓存操作来提高效率,缓存代码应该添加到哪儿?

思考,结合上案例中17、18行代码,请描述reduceByKeygroupByKey的区别?

4.2依赖

抽象RDD采用结构设计模式中的装饰器(包装器)模式设计,如下图根据第一章 初识Spark中的词频统计案例绘制。
在这里插入图片描述

由于RDD转化算子的返回值是一个新的RDD,新RDD和原RDD之间存在一种关系,这种关系就称为依赖。

不同的算子依据其特性,可能会产生不同的依赖。

  • 例如map操作会产生narrow dependency
  • 而join操作一般则产生wide dependency

依赖关系分类:

  • 窄依赖
    • 父RDD一个数据分区只被子RDD的一个数据分区所使用
  • 宽依赖
    • 父RDD一个数据分区被子RDD的多个数据分区所使用

案例演示依赖关系如下:
在这里插入图片描述

思考,借助于依赖关系,可以完成哪些事情?

提示:

  • 任务机制

    • 根据依赖关系获取DAG,从而进行Stage划分,产生TaskSets。
  • 容错机制

    • 在部分分区数据丢失时,可以通过依赖关系重新计算丢失的分区数据,而不需要对所有分区进行重新计算。
    • 在Spark中一般称为:lineage、谱系图、血缘系统。

依赖对象源代码如下:

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

4.2.1窄依赖

父RDD一个数据分区只被子RDD的一个数据分区所使用。

具体子类有:

OneToOneDependency
RangeDependency

窄依赖以及其具体子类源代码如下:

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}
/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

思考,在案例演示依赖关系图中,窄依赖的三个案例中,哪些是 OneToOneDependency?哪些是RangeDependency

答案,案例1、案例3是OneToOneDependency ;案例2是RangeDependency

4.2.2宽依赖

父RDD一个数据分区被子RDD的多个数据分区所使用。

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
 *                   explicitly then the default serializer, as specified by `spark.serializer`
 *                   config option, will be used.
 * @param keyOrdering key ordering for RDD's shuffles
 * @param aggregator map/reduce-side aggregator for RDD's shuffle
 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
 * @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask
 */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {

  if (mapSideCombine) {
    require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
  }
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}

4.3有向无环图

在任务执行机制中,依赖关系称为DAG。

根据以下步骤编写代码,来查看有向无环图 。

  1. 读取一个日志文件
  2. 过滤出来包含error的日志数据
  3. 在第一步的基础上,过滤出来包含 warning的日志数据
  4. 将第二步和第三步的RDD合并
  5. 获取满足条件的数据个数
  6. 获取十条满足条件的数据,并输出到控制台
object Test{
  def main(args:Array[String])={
    //获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("演示2-分区个数")
    //获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)
    
   	val inputRDD=sc.textFile("log.txt")
		val errorRDD=inputRDD.filter(line => line.contains("error"))
		val waraningRDD=inputRDD.filter(line => line.contains("warning"))
		val badLinesRDD=errorRDD.union(warningRDD)

		println("Input had "+badLinesRDD.count+"concerning lines")
		println("Here are 10 examples:")
		badLinesRDD.take(10).foreach(println)	
    
    //关闭资源
    sc.stop()
  }
}

浏览Spark的WEB页面,查看其中的有向无环图。

在这里插入图片描述

思考,结合RDD中采用装饰模式的设计理念,绘制该App中涉及到的RDD转化图形。

5.分区方式

  • Spark程序通过控制RDD分区方式来减少通信开销
  • 根据key控制每个分区内的数据
  • 只有key-value的RDD,才可能会有partitioner
  • 非key-value的RDD的partitioner返回值是None

分区方式源代码如下:

/**
 * An object that defines how the elements in a key-value pair RDD are partitioned by key.
 * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
 *
 * Note that, partitioner must be deterministic, i.e. it must return the same partition id given the same partition key.
 */
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

object Partitioner {
/**
   * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
   *
   * If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
   * as the default partitions number, otherwise we'll use the max number of upstream partitions.
   *
   * When available, we choose the partitioner from rdds with maximum number of partitions. If this
   * partitioner is eligible (number of partitions within an order of maximum number of partitions
   * in rdds), or has partition number higher than or equal to default partitions number - we use
   * this partitioner.
   *
   * Otherwise, we'll use a new HashPartitioner with the default partitions number.
   *
   * Unless spark.default.parallelism is set, the number of partitions will be the same as the
   * number of partitions in the largest upstream RDD, as this should be least likely to cause
   * out-of-memory errors.
   *
   * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
   */
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val rdds = (Seq(rdd) ++ others)
    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))

    val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
      Some(hasPartitioner.maxBy(_.partitions.length))
    } else {
      None
    }

    val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
      rdd.context.defaultParallelism
    } else {
      rdds.map(_.partitions.length).max
    }

    // If the existing max partitioner is an eligible one, or its partitions number is larger
    // than or equal to the default number of partitions, use the existing partitioner.
    if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
        defaultNumPartitions <= hasMaxPartitioner.get.getNumPartitions)) {
      hasMaxPartitioner.get.partitioner.get
    } else {
      new HashPartitioner(defaultNumPartitions)
    }
  }

  /**
   * Returns true if the number of partitions of the RDD is either greater than or is less than and
   * within a single order of magnitude of the max number of upstream partitions, otherwise returns
   * false.
   */
  private def isEligiblePartitioner(
     hasMaxPartitioner: RDD[_],
     rdds: Seq[RDD[_]]): Boolean = {
    val maxPartitions = rdds.map(_.partitions.length).max
    log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
  }
}

5.1分区对象

5.1.1HashPartitioner

/**
 * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
 * Java's `Object.hashCode`.
 *
 * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
 * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
 * produce an unexpected or incorrect result.
 */
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

5.1.2RangePartitioner

/**
 * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
 * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
 *
 * @note The actual number of partitions created by the RangePartitioner might not be the same
 * as the `partitions` parameter, in the case where the number of sampled records is less than
 * the value of `partitions`.
 */
class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true,
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {

  // A constructor declared in order to maintain backward compatibility for Java, when we add the
  // 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160.
  // This is added to make sure from a bytecode point of view, there is still a 3-arg ctor.
  def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
    this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
  }

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
  require(samplePointsPerPartitionHint > 0,
    s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // Cast to double to avoid overflowing ints or longs
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }

  def numPartitions: Int = rangeBounds.length + 1

  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

  override def equals(other: Any): Boolean = other match {
    case r: RangePartitioner[_, _] =>
      r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
    case _ =>
      false
  }

  override def hashCode(): Int = {
    val prime = 31
    var result = 1
    var i = 0
    while (i < rangeBounds.length) {
      result = prime * result + rangeBounds(i).hashCode
      i += 1
    }
    result = prime * result + ascending.hashCode
    result
  }

  @throws(classOf[IOException])
  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case js: JavaSerializer => out.defaultWriteObject()
      case _ =>
        out.writeBoolean(ascending)
        out.writeObject(ordering)
        out.writeObject(binarySearch)

        val ser = sfactory.newInstance()
        Utils.serializeViaNestedStream(out, ser) { stream =>
          stream.writeObject(scala.reflect.classTag[Array[K]])
          stream.writeObject(rangeBounds)
        }
    }
  }

  @throws(classOf[IOException])
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case js: JavaSerializer => in.defaultReadObject()
      case _ =>
        ascending = in.readBoolean()
        ordering = in.readObject().asInstanceOf[Ordering[K]]
        binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]

        val ser = sfactory.newInstance()
        Utils.deserializeViaNestedStream(in, ser) { ds =>
          implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
          rangeBounds = ds.readObject[Array[K]]()
        }
    }
  }
}

private[spark] object RangePartitioner {

  /**
   * Sketches the input RDD via reservoir sampling on each partition.
   *
   * @param rdd the input RDD to sketch
   * @param sampleSizePerPartition max sample size per partition
   * @return (total number of items, an array of (partitionId, number of items, sample))
   */
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

  /**
   * Determines the bounds for range partitioning from candidates with weights indicating how many
   * items each represents. Usually this is 1 over the probability used to sample this candidate.
   *
   * @param candidates unordered candidates with weights
   * @param partitions number of partitions
   * @return selected bounds
   */
  def determineBounds[K : Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    val ordered = candidates.sortBy(_._1)
    val numCandidates = ordered.size
    val sumWeights = ordered.map(_._2.toDouble).sum
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      val (key, weight) = ordered(i)
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          bounds += key
          target += step
          j += 1
          previousBound = Some(key)
        }
      }
      i += 1
    }
    bounds.toArray
  }
}

5.1.3自定义Partitioner

extends Partitioner重写两个抽象方法即可。

案例:


case class Student(name:String,age:Int,gender:String)

class  GenderPartitioner(numPatitions:Int) extends  Partitioner{
  override def numPartitions:Int = numPatitions

  override def getPartition(key: Any): Int = key match {
    case null => 0
    case x:Student => x.gender().hashcode() % numPatitions
    case _  => throw new UnsupportedOperationException("不支持")
  }

  override def hashCode(): Int = numPatitions;

  override def equals(o: scala.Any): Boolean = o match {
    case x:GenderPartitioner => x.numPartitions == numPatitions
    case _ => false
  }

}

5.2分区方式

查看RDD分区方式:rdd.partitioner

思考,如何让rdd有分区方式?

回答:

  • 预定义分区
    • 专门分区方式partitionBy
  • 分区操作
    • 调用带有分区方式的功能方法
      • cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、 groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sortByKey()、(如果父RDD有分区方式的话,filter、mapValues、flatMapValues)等,其他操作生成结果都不会存在特定分区方式。
      • 如果RDD在调用以上操作之前已经具有的分区方式,则以上操作都能够从分区中获益。
      • 此时以上方法就不会进行数据混洗,减少了数据混洗的开销。

演示代码:

import org.apache.spark.{HashPartitioner, RangePartitioner}
import org.apache.spark.rdd.RDD

object PartitionerTest {
  def main(args: Array[String]): Unit = {
    
    val conf=new SparkConf
    conf.setMaster("local[*]")
    conf.setAppName("分区方式练习")
    val sc=SparkContext.getOrCreate(conf)
    
    val seq=Seq(
      					Student("larry",56,"男"),
                Student("renen",50,"女"),
                Student("kevin",46,"男"),
                Student("tarry",36,"男"),
                Student("mark",32,"男")
               )
    val rdd: RDD[Int] =sc.makeRDD(seq,3)
    val pairRDD: RDD[(Int, Int)] =rdd.map(stu=>stu->stu.age)
    
    println(s"分区方式:${pairRDD.partitioner}")
    println(s"分区个数:${pairRDD.getNumPartitions}")

    //如何让rdd有分区方式?
    
    //1.专门分区方式方法 partitionBy
    
    //1.1HashPartitioner
    val partitioner=new HashPartitioner(5)
    //1.2RangePartitioner
    //val partitioner=new RangePartitioner[Int,Int](2,pairRDD)
    //1.3自定义分区
    //val partitioner=new GenderPartitioner(2)
    
    val partitionerRDD=pairRDD.partitionBy(partitioner)
    println(s"分区方式:${partitionerRDD.partitioner}")
    println(s"分区个数:${partitionerRDD.getNumPartitions}")
    
    //2.调用带有分区方式的功能方法
    val nameRDD=rdd.map(stu=>stu.name->stu)
    val groupRDD: RDD[(Int, Iterable[Int])] = nameRDD.groupByKey()
    //val groupRDD: RDD[(Int, Iterable[Int])] =nameRDD.groupByKey(partitioner = new RangePartitioner(6,pairRDD))
    println(s"分区方式:${groupRDD.partitioner}")
    println(s"分区个数:${groupRDD.getNumPartitions}")
    
    val sortRDD: RDD[(Int, Int)] = nameRDD.sortByKey()
    println(s"分区方式:${sortRDD.partitioner}")
    println(s"分区个数:${sortRDD.getNumPartitions}")

    
    //请分析以下两行代码的区别:
    groupRDD.map(elem=>(elem._1,elem._2.age)).sortByKey().foreach(println)
    groupRDD.mapValues(elem=>elem.age).sortByKey().foreach(println)


    //关闭资源
    sc.stop();
  }
}

说明:

  • 对于二元操作,输出数据的分区方式取决于父RDD的分区方式。
  • 默认情况下,结果采用哈希分区,分区的数量和并行度一样。
  • 如果其中一个父RDD已经设置过分区方式,那么结果就会采用那种分区方式。
  • 如果两个父RDD都设置过分区方式,结果RDD采用第一个父RDD的分区方式。

6.优选位置

存储每个Partition位置的列表(preferred location)

  • 对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块位置。

注意,按照“移动数据不如移动计算”原则,Spark在进行任务调度的时候,会尽可能根据数据文件存储的位置信息,将任务分配到数据所在的节点进行计算。

标签:Core,val,Int,分区,RDD,Spark,模型,def,partitions
来源: https://blog.csdn.net/angeliacmm/article/details/116431259

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有