标签:类型转换 flatMap CompactBuffer String val RDD 键值 算子 println
1. groupByKey
- 定义:groupByKey([numPartitions])、
- 解释:只对键值对类型RDD生效,同时返回的是一个新的RDD[(key,Iterator[Value])]
- 案例:
def groupByKeyOper(sc: SparkContext): Unit = {
println("----------------groupByKey开始------------------")
val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
val flatMap: RDD[String] = rdd.flatMap((line: String) => {
line.split(" ")
})
val map = flatMap.map((_, 1))
val groupByKey: RDD[(String, Iterable[Int])] = map.groupByKey()
val ptln: String = groupByKey.collect().mkString("=")
println(ptln) // (spark,CompactBuffer(1))=(hive,CompactBuffer(1, 1, 1))=(hadoop,CompactBuffer(1, 1))=(Azkaban,CompactBuffer(1))=(Math,CompactBuffer(1))=(Chinese,CompactBuffer(1))=(English,CompactBuffer(1))=(mapreduce,CompactBuffer(1, 1, 1))=(flink,CompactBuffer(1, 1))=(kafka,CompactBuffer(1, 1))=(hbase,CompactBuffer(1, 1))=(Hadoop,CompactBuffer(1))
println("----------------groupByKey结束------------------")
}
2. reduceByKey
- 定义:reduceByKey(func, [numPartitions])
- 解释:对相同的key值的value数据通过func函数进行聚合操作(总和、最大值、最小值...)返回的是一个RDD[(Key,Value的类型---value代表的是相同key值聚合之后的结果)]
- 案例:
def reduceByKeyOper(sc: SparkContext) = {
println("----------------reduceByKey开始------------------")
val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
val flatMap: RDD[String] = rdd.flatMap((line: String) => {
line.split(" ")
})
val map = flatMap.map((_, 1))
val reduceByKey: RDD[(String, Int)] = map.reduceByKey((_ + _))
println(reduceByKey.collect().mkString("=")) // (spark,1)=(hive,3)=(hadoop,2)=(Math,1)=(Azkaban,1)=(Chinese,1)=(English,1)=(mapreduce,3)=(flink,2)=(kafka,2)=(hbase,2)=(Hadoop,1)
println("----------------reduceByKey结束------------------")
}
3. aggregateByKey
- 定义:aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
- 解释:
(1)零值:初始值
(2)seqOp函数:对每一个分区中key值相同的value数据和零值聚合操作
(3)combOp函数:对不同分区key值相同已经通过seqOp函数计算出来的聚合结果 在和零值聚合一次
当前这个算子是一个转换算子,返回一个新的键值对类型的RDD 其中RDD中key还是原来的key value是aggregateByKey聚合之后的结果 - 案例:
def aggregateByKeyOper(sc: SparkContext) = {
println("----------------reduceByKey开始------------------")
val rdd = sc.textFile("hdfs://node1:9000/wc.txt")
val flatMap: RDD[String] = rdd.flatMap((line: String) => {
line.split(" ")
})
val map = flatMap.map((_, 1))
val agg: RDD[(String, Int)] = map.aggregateByKey(0)(
(a: Int, b: Int) => {
a + b
},
(a: Int, b: Int) => {
a + b
}
)
agg.foreach(println(_)) // (spark,1) (hive,3) (hadoop,2) (Math,1) (Azkaban,1) (Chinese,1) (English,1) (mapreduce,3) (flink,2) (kafka,2) (hbase,2) (Hadoop,1)
println("----------------reduceByKey结束------------------")
}
标签:类型转换,flatMap,CompactBuffer,String,val,RDD,键值,算子,println 来源: https://www.cnblogs.com/jsqup/p/16618549.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。