ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

寒假学习进度6

2021-12-31 23:04:41  阅读:189  来源: 互联网

标签:sparkContext val List 学习 rdd 寒假 进度 new sparkConf


今天继续学习sparkRDD的算子

(1)flatMap

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[List[Int]]= sparkContext.makeRDD(List(List(1, 2), List(3, 4)))
//flatmap,讲List变成Int
//使用flatmap进行扁平化处理,将List集合里数据进行拆分
val flatrdd: RDD[Int] = rdd.flatMap(
list => {
list //讲拆分的数据进行封装成一个LIst
}
)
flatrdd.collect().foreach(println)
sparkContext.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String]= sparkContext.makeRDD(List("hello word","hello spark"))
//flatmap
//使用flatmap进行扁平化处理,将List集合里数据进行拆分,用空格做分隔符
val flatrdd: RDD[String] = rdd.flatMap(
s => {
s.split(" ")
}
)
flatrdd.collect().foreach(println)
sparkContext.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11")
val sparkContext = new SparkContext(sparkConf)

val rdd= sparkContext.makeRDD(List(List(1, 2), 3,List(4, 5)))
//flatmap
//因为list集合里类型不一致,所以使用模式匹配的方式,讲不是集合的封装成一个集合
val flatrdd: RDD[Any] = rdd.flatMap(
data => {
data match {
case list: List[_] => list
case data => List(data)

}
}
)
flatrdd.collect().foreach(println)
sparkContext.stop()
}
 

(2)glom

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)
//讲Int变成Array
val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)

val glomrdd: RDD[Array[Int]] = rdd.glom()

glomrdd.collect().foreach(data=>println(data.mkString(",")))
sparkContext.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)
//将Int变成Array
val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
val glomrdd: RDD[Array[Int]] = rdd.glom()

//将2个分区数组数据(Array)用map中的max求每个分区中最大值
val maxRdd: RDD[Int] = glomrdd.map(
array => {
array.max
}
)

//将maxRdd 2个分区数组采集求和
println(maxRdd.collect().sum)
sparkContext.stop()
}

(3)groupBy

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)
val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
def groupFunction(num:Int)={
num%2
}

val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
sparkContext.stop()
}

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)
val rdd: RDD[String] = sparkContext.makeRDD(List("hello","spark","hi","sss"), 2)

val grouprdd: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
grouprdd.collect().foreach(println)

sparkContext.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

//读取apache.log文件
val rdd=sparkContext.textFile("data/apache.log")

//取数据中每小时的点击量
val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
line => {
//将每行数据以空格为分割,分成多个字符串
val data = line.split(" ")
//取第4个字符串
val time = data(3)

//转换格式
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
//解析time
val datas= sdf.parse(time)
//取“小时”字符
val sdf1 = new SimpleDateFormat("HH")
//格式化字符
val hour = sdf1.format(datas)
(hour, 1)//比如08小时出现一次计1个
}
).groupBy(_._1)

timeRDD.map{
//模式匹配
case (hour,iter)=>{
(hour,iter.size)
}
}.collect().foreach(println)

sparkContext.stop()
}

(4)filter

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)

//filter,根据符合规则的数据筛选
val rdd= sc.makeRDD(List(1,2,3,4), 2)
val fliterrdd: RDD[Int] = rdd.filter(
num => num % 2 != 0
)
fliterrdd.collect().foreach(println)
sc.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)

//filter,根据符合规则的数据筛选
val rdd=sc.textFile("data/apache.log")
rdd.filter(
line=>{
//将每行数据以空格为分割,分成多个字符串
val data = line.split(" ")
//取第4个字符串
val time = data(3)
time.startsWith("17/05/2015")
}
).collect().foreach(println)

sc.stop()
}

 

标签:sparkContext,val,List,学习,rdd,寒假,进度,new,sparkConf
来源: https://www.cnblogs.com/chenghaixiang/p/15754615.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有