详解系列四"/>
Spark core详解系列四
RDD练习题一
要求:数据如下
a,1,3
a,2,4
b,1,1
根据数据第一列统计得到如下结果
a,3,7
b,1,1
用RDD实现。
实现功能核心代码如下:
val input = sc.parallelize(List(List("a", 1, 3),List("a", 2, 4),List("b", 1, 1)
))
input.map(x => {val key = x(0).toStringval v1 = x(1).toString.toIntval v2 = x(2).toString.toInt(key, (v1, v2))
}).reduceByKey((x, y) => {(x._1 + y._1, x._2 + y._2)
}).map(x => (x._1, x._2._1, x._2._2)).collect.foreach(println)
RDD练习题二
要求:数据如下
"1000000,一起看|电视剧|军旅|士兵突击,1,0",
"1000000,一起看|电视剧|军旅|士兵突击,1,1",
"1000001,一起看|电视剧|军旅|我的团长我的团,1,1",
统计得到如下结果:
((1000000,一起看),(2,1))
((1000000,电视剧),(2,1))
((1000000,军旅),(2,1))
((1000000,士兵突击),(2,1))
((1000001,一起看),(1,1))
((1000001,电视剧),(1,1))
((1000001,军旅),(1,1))
((1000001,我的团长我的团),(1,1))
实现功能核心代码如下:
val input = sc.parallelize(List("1000000,一起看|电视剧|军旅|士兵突击,1,0", "1000000,一起看|电视剧|军旅|士兵突击,1,1","1000001,一起看|电视剧|军旅|我的团长我的团,1,1"
))val processRDD = input.flatMap(x => {val splits = x.split(",")val id = splits(0).toIntval word = splits(1)val show = splits(2).toIntval clicks = splits(3).toIntval words = word.split("\\|")words.map(x => ((id, x), (show, clicks)))
})processRDD.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).collect.foreach(println)
processRDD.groupByKey().mapValues(x => {val totalShows = x.map(_._1).sumval totalClicks = x.map(_._2).sum(totalShows, totalClicks)
}).collect.foreach(println)
补充知识点:
reduceByKey和groupByKey对比 两个都是shuffle算子,但reduceByKey有预聚合功能,所以reduceByKey的shuffle数据传输更小,生产上优先选择reduceByKey。两者的分区器默认都是 HashPartitioner,分区数默认就是并行度。
RDD练习题三
要求:求组内 Top N
日志文件数据形如:www.baidu,url1
思路一:利用 toList 然后排序,take出topN,但是toList存在安全隐患,适合小数据量。
val processRDD = input.map(x => {val splits = x.split(",")val site = splits(0)val url = splits(1)((site, url), 1)
})processRDD.reduceByKey(_+_).groupBy(_._1._1).mapValues(x=>{x.toList.sortBy(-_._2) // toList是一个很大的安全隐患.map(x=>(x._1._2,x._2)).take(TOPN)}).map(x=>(x._1,x._2(0),x._2(1))).printInfo()
思路二:分而治之
val sites = processRDD.map(_._1._1).distinct().collect() // collect将RDD转化为数组,因为两个RDD不能嵌套
sites.foreach(x=>{processRDD.filter(_._1._1 == x).reduceByKey(_+_).sortBy(-_._2).take(TOPN).foreach(println)
})
补充知识点:
1.RDD不支持嵌套,即不能使用如:rdd1.map(x => rdd2.values.count() * x)
解决办法:将其中一个rdd转化成数组,如使用 rdd1.collect(),但是collect算子只适用于小数据量。
2.foreach算子和map算子区别:map返回新的RDD,且是transformation算子;foreach无返回值,且是action算子。
3.take是action算子,返回结果是一个Array,不需要collect,直接可以 foreach(println)。
更多推荐
Spark core详解系列四
发布评论