标签:zeroValue value1 分区 reduce value Rdd keyValue key aggregateByKey
1. 定义
/* * 1. 定义 * def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner) * (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] * * def aggregateByKey[U: ClassTag](zeroValue: U) * (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] * * def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int) * (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] * * * 2.功能 * 将数据根据不同的规则进行 分区内计算 和 分区间计算 * 操作流程 * 1. 分区内 对相同的key 分组 * 示例 : key iter(value1,value2,value3) * * 2. 根据出入的规则 seqOp: (U, V) => U 对分区内相同的key 做聚合操作 * 示例 : seqOp(zeroValue,value1)... * * 3. 聚合后输出每个分区的结果 key,value * * 4. 拉取每个分区 的key,value ,并对相同key 的value做reduce操作(存在Shuffle过程) * 示例 : combOp(zeroValue,value1)... * * 5. 对 所有分区的key 做完reduce操作后,按照指定的 partitioner 重新对结果分区 * 不指定分区器时,用默认分区器 HashPartitoner * 不指定分区个数时,用父Rdd分区个数 * * 3.note * zeroValue 会参与 分区内和分区间的 reduce操作 * * */
2. 示例
object aggregateTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd: RDD[(Int, String)] = sc.makeRDD(List((2, "x1"), (2, "x2"), (2, "x3"), (4, "x4"), (5, "x5"), (5, "x6"), (6, "x7")), 2) private val rdd2 = rdd.aggregateByKey("")( //分区内计算规则 (从左往右计算) //对分区内 相同key 的value 做reduce操作 (zeroValue: String, value1: String) => { println(s"key:${zeroValue} value:${value1}") zeroValue + "-" + value1 } //拉取各个分区 key-value,对相同key 的value 做reduce操作 , (zeroValue, par_value) => zeroValue + par_value ) println(s"${rdd2.collect().mkString(",")}") sc.stop() }
标签:zeroValue,value1,分区,reduce,value,Rdd,keyValue,key,aggregateByKey 来源: https://www.cnblogs.com/bajiaotai/p/16054066.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。