ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark追妻系列(Value类型的RDD)

2022-02-03 17:33:22  阅读:182  来源: 互联网

标签:sparkContext value val 分区 追妻 RDD Value value1 new


今天是大年初三,猴赛雷

小谈

        这几天每天晚上给她发一个红包,拜年红包而且还可以添加表情包。感觉现在过年好没有年味吖。嗑瓜子磕的嗓子都疼了。

        Spark中的算子有很多,有Value类型,双Value类型,这两天写的都是Value类型的,昨天讲的是关于map的映射。

        今天讲剩余的算子

glom

        glom算子将RDD中的每一个分区变成一个数组,并放置在RDD中,数组中的元素类型与原分区中的类型相同。原本这个分区里面的数是分散的,glom之后,这个分区里面的元素会变成一个数组。

def glom(): RDD[Array[T]] = withScope { 
new MapPartitionsRDD[Array[T], T]
(this, (_, _, iter) => Iterator(iter.toArray))

        可以看出来,glom底层用到了MapPartitionsRDD对象,这个MapPartitionsRDD对象底层重写了getPartitions方法,这个方法用到了RDD的依赖,保持分区的数据不变,然后将分区的数据转变成数组类型。

        来一个例子,计算所有分区最大值的求和。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") 
val sparkContext = new SparkContext(sparkConf) 
// 两个分区 12 在分区1 35 在分区2 
val value = sparkContext.makeRDD(List(1, 2, 3, 5), 2) 
//对每个分区的数组glom 
val value1 = value.glom() 
//找到每个数组的最大值 
val value2 = value1.map(array => array.max) 
//将每个数组最大值进行求和 
val sum = value2.collect().sum

        分区最大值求和

        分区1的最大值: 2 分区2的最大值: 5

        collect之后,每个节点给Driver返回每个数组的最大值,之后相加

groupBy

        将数据按照指定的规则进行分组,分区默认不变,但是可能分组后的数据不再原来的分区,数据会被打乱,这样的操作就是Shuffle。极限情况下,数据可能分在同一个分区。

        将相同的key的数据分到一个迭代器里面。

        下面举一个例子

 

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
//将数据按照指定的规则进行分组 
val value = sparkContext.makeRDD(List(1, 2, 3, 4), 2) 
val value1 = value.groupBy(_ % 2 == 0) 
value1.collect().foreach(println(_))

        刚开始 两个分区

        分区1: 1 2 分区2:3 4

        group by之后,数据发生改变,只有一个分区存在数据

(1,(false,CompactBuffer(1, 3))) (1,(true,CompactBuffer(2, 4)))

        可以看到,分区后的数据并不是上面图解的那样,因为发生了Shuffle

        为了体现出group by的作用,下面将将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List("Hello", "hbase", "Hive", "Hadoop"), 2) 
val value1 = value.groupBy(_.charAt(0)) 
value1.collect().foreach(println(_)) }

        看一下结果

(h,CompactBuffer(hbase)) (H,CompactBuffer(Hello, Hive, Hadoop))

        前面的字符是我们分组的依据,后面的迭代器就是分组后的元素

WordCount

        学习mr的时候,第一个例子就是求WordCount。下面也用Spark来进行操作这就是简易的wc。首先将每个单词分开,分开之后对每个单词进行计数(单词,1),计完数后,根据单词分组,单词分组之后求迭代器的大小。

​
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List("Hello Spark", "Hello Scala"), 2) 
//先把单词分成一个一个 
val value1 = value.flatMap(_.split(" ")) 
//对每个单词进行计数 
val value2 = value1.map((_, 1)) 
//分组 
val value3 = value2.groupBy(_._1) 
val value5 = value3.map {
 case (word, iter) => (word, iter.size) } 

value5.collect().foreach(println(_))

​

Filter

        将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。对数据进行筛选之后,分区不变,但是分区内的数据可能不均衡,可能会造成数据倾斜。

 

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(1, 2, 3, 4)) 
val value1 = value.filter(_ != 1) 
value1.collect().foreach(println(_))

Distinct

        对数据集中的重复数据进行去重

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(1, 2, 3, 4,4)) 
val value1 = value.distinct()

        如果在distinct中没有参数,那么最后的结果就是1,2,3,4

        如果在distinct中有参数,比如2,那么最后的结果就会有两个分区,原本一个分区的(因为设置的setMaster为Local,所以默认一个分区),原本一个分区的数据,分散到两个分区里面,发生了shuffle

coalesce

        根据数据量缩减分区。如果分区数过多,可以进行减少分区。

        coalesce这个算子里面的参数有一个 shuffle ,如果为True,那么这个coalesce就会将数据shuffle,会将数据进行打乱,如果为False,那么数据就不会被打乱。

        先来看看例子把

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
//四个数据,三个分区 1 2 34 
val value = sparkContext.makeRDD(List(1, 2, 3, 4),3) 
//减少分区为两个,shuffle默认为False,不会打乱数据 
val value1 = value.coalesce(2) 
//可以来看数据的分区 
val value2 = value1.mapPartitionsWithIndex(
(index, iter) => { iter.map(num => (index, num)) })
value2.collect().foreach(println(_))

        首先看一下先前的分区

        分区1:1 分区2 :2 分区3:3 4

        现在看一下在shuffle参数为False的情况下,分区的情况

        可以看到,分区真的减少了,明明数据已经不再第三个分区了,跑到第二个分区了,这数据不就被打乱了。

        其实并不是,看一看 分区3的数据 3和4 一起进入了第二个分区,第三分区的数据没有被打乱。这就是所谓的shuffle,并没有将数据打乱,只是将原来的数据放到了其它的分区。

        现在看一下shuffle参数为True的情况下

        如果有Shuffle了,1 2 3 4 分到两个分区里面,

        分区1 : 1 3 分区2: 2 4

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
//四个数据,三个分区 1 2 34 
val value = sparkContext.makeRDD(List(1, 2, 3, 4),3) 
//减少分区为两个,shuffle参数为true,会有shuffle阶段 
val value1 = value.coalesce(2,true) 
//可以来看数据的分区 
val value2 = value1.mapPartitionsWithIndex(
(index, iter) => { iter.map(num => (index, num)) })
value2.collect().foreach(println(_))

        可以看到,原来的第一分区,第二分区里面的数据不改变,只是将第三个分区里面的数据放入到两个分区里面

repartition

        repartition算子和coalesce算子基本相同,不过reaprtition算子可以将分区数目减少,也可以将分区数目增大,不论是增大或减小,repartition算子一定会有shuffle阶段,一定会打乱数据。

 

        可以看到,shuffle参数默认就是true。

        就是会改变数据的分区

sortBy

        该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理

的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一

致。中间存在 shuffle 的过程

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(4, 3, 1, 2)) 
val value1 = value.sortBy(num => num) 
value1.collect().foreach(println(_))

最后结果

 

可以看到是升序排序的,如果想降序排序,就添加一个参数false

        默认是true,按照升序排序

总结

        今天总算将Value类型的算子讲完了,明天会将双Value类型的算子讲解完毕。

        今年是大年初三,过年也要学习哦

标签:sparkContext,value,val,分区,追妻,RDD,Value,value1,new
来源: https://blog.csdn.net/weixin_46300771/article/details/122777110

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有