Spark core详解系列四

编程入门 行业动态 更新时间:2024-10-09 20:23:22

Spark core<a href=https://www.elefans.com/category/jswz/34/1770044.html style=详解系列四"/>

Spark core详解系列四

RDD练习题一

要求:数据如下

a,1,3
a,2,4
b,1,1

根据数据第一列统计得到如下结果

a,3,7
b,1,1

用RDD实现。

实现功能核心代码如下:

val input = sc.parallelize(List(List("a", 1, 3),List("a", 2, 4),List("b", 1, 1)
))
input.map(x => {val key = x(0).toStringval v1 = x(1).toString.toIntval v2 = x(2).toString.toInt(key, (v1, v2))
}).reduceByKey((x, y) => {(x._1 + y._1, x._2 + y._2)
}).map(x => (x._1, x._2._1, x._2._2)).collect.foreach(println)

RDD练习题二

要求:数据如下

"1000000,一起看|电视剧|军旅|士兵突击,1,0", 
"1000000,一起看|电视剧|军旅|士兵突击,1,1",
"1000001,一起看|电视剧|军旅|我的团长我的团,1,1",

统计得到如下结果:

((1000000,一起看),(2,1))
((1000000,电视剧),(2,1))
((1000000,军旅),(2,1))
((1000000,士兵突击),(2,1))
((1000001,一起看),(1,1))
((1000001,电视剧),(1,1))
((1000001,军旅),(1,1))
((1000001,我的团长我的团),(1,1))

实现功能核心代码如下:

val input = sc.parallelize(List("1000000,一起看|电视剧|军旅|士兵突击,1,0", "1000000,一起看|电视剧|军旅|士兵突击,1,1","1000001,一起看|电视剧|军旅|我的团长我的团,1,1"
))val processRDD = input.flatMap(x => {val splits = x.split(",")val id = splits(0).toIntval word = splits(1)val show = splits(2).toIntval clicks = splits(3).toIntval words = word.split("\\|")words.map(x => ((id, x), (show, clicks)))
})processRDD.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).collect.foreach(println)
processRDD.groupByKey().mapValues(x => {val totalShows = x.map(_._1).sumval totalClicks = x.map(_._2).sum(totalShows, totalClicks)
}).collect.foreach(println)

补充知识点

reduceByKey和groupByKey对比 两个都是shuffle算子,但reduceByKey有预聚合功能,所以reduceByKey的shuffle数据传输更小,生产上优先选择reduceByKey。两者的分区器默认都是 HashPartitioner,分区数默认就是并行度。

RDD练习题三

要求:求组内 Top N
日志文件数据形如:www.baidu,url1

思路一:利用 toList 然后排序,take出topN,但是toList存在安全隐患,适合小数据量。

val processRDD = input.map(x => {val splits = x.split(",")val site = splits(0)val url = splits(1)((site, url), 1)
})processRDD.reduceByKey(_+_).groupBy(_._1._1).mapValues(x=>{x.toList.sortBy(-_._2)  // toList是一个很大的安全隐患.map(x=>(x._1._2,x._2)).take(TOPN)}).map(x=>(x._1,x._2(0),x._2(1))).printInfo()

思路二:分而治之

val sites = processRDD.map(_._1._1).distinct().collect()  // collect将RDD转化为数组,因为两个RDD不能嵌套
sites.foreach(x=>{processRDD.filter(_._1._1 == x).reduceByKey(_+_).sortBy(-_._2).take(TOPN).foreach(println)
})

补充知识点

1.RDD不支持嵌套,即不能使用如:rdd1.map(x => rdd2.values.count() * x)
解决办法:将其中一个rdd转化成数组,如使用 rdd1.collect(),但是collect算子只适用于小数据量。
2.foreach算子和map算子区别:map返回新的RDD,且是transformation算子;foreach无返回值,且是action算子。
3.take是action算子,返回结果是一个Array,不需要collect,直接可以 foreach(println)。

更多推荐

Spark core详解系列四

本文发布于:2024-03-12 13:41:57,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1731640.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:详解   系列   Spark   core

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!