RDD行动算子和血缘关系

编程入门 行业动态 更新时间:2024-10-28 14:33:18

RDD行动<a href=https://www.elefans.com/category/jswz/34/1748093.html style=算子和血缘关系"/>

RDD行动算子和血缘关系

wordCount分布式运行

  1. 将wordCount进行打包上传,path使用args参数传参
  2. 启动hdfs和yarn
  3. 提交任务
[atguigu@hadoop102 spark-yarn]$ bin/spark-submit \
--class com.atguigu.spark.WordCount \
--master yarn \
./WordCount.jar \
/input \
集群模式:client/cluster
/output

注意: 如果集群模式选择为cluster,代码中的local[*]必须改为yarn,否则会报错。

foreach和collect

foreach底层是多线程打印的,出现的结果是分区间有序,但是整体无序,如果需要有序,只能将分区数设置为1,可以节省一些流量。
collect是将各个Executor计算的结果聚合在一起,输出结果是整体有序的。

行动算子

算子名称作用
collect按顺序从各个executor中收取数据
foreach直接在各个executor中操作数据
count统计元素个数
first返回0号分区的第一个值
take(int n)根据参数个数提取数据
countByKeyrdd需要是kv对集合,根据key统计个数,返回map结果
save以特定格式,一般为textFile, 保存数据

Spark之Kryo序列化

适合简单数据的序列化,比hadoop的序列化更加轻量。使用方法如下:

  1. 替换默认的序列化机.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
  2. 注册序列化对象,参数为数组类型new 类名[]{类1,类2}
  3. 后面使用conf对象创建sc即可
// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore")// 替换默认的序列化机制.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册需要使用kryo序列化的自定义类.registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);

血缘关系

RDD.toDebugString().sout可以查看所有RDD的血缘关系。
打印的关系如下所示,其中(2)表示分区数量,±表示进行了shuffle,分区数量有所改变。如果进入了shuffle,计算时就会划分为多个阶段,即阶段数 = shuflle数量 +1。

常见的会走shffle的算子有reduceByKey和sortBy.

(2) ShuffledRDD[4] at reduceByKey at WordCount2.java:56 []+-(2) MapPartitionsRDD[3] at mapToPair at WordCount2.java:47 []|  MapPartitionsRDD[2] at flatMap at WordCount2.java:36 []|  input/1.txt MapPartitionsRDD[1] at textFile at WordCount2.java:33 []|  input/1.txt HadoopRDD[0] at textFile at WordCount2.java:33 []

宽依赖:分区数量改变了,即走了shuffle
窄依赖:分区数量和分区规则不变,不走shuffle.

注: join操作如果是分桶join时,不需要走shuffle,其他普通join时则需要打散分区进行shuffle操作。

Stage任务划分

  1. 提交任务后,画出DAG有向无环图
  2. 划分阶段,阶段划分的标志是是否有Shuffle.
  3. 划分Task,任务运行的最小基本单位,按照分区进行划分, 每一份分区一个Task.
  4. 任务交给Worker即executor去执行,executor中的线程数等于CPU个数,每个CPU可以去执行一个Task。
  5. job划分,每执行一个行动算子就是一个job。如果算子计算过程中有sortBy算子,会划分为两个job。因为sortBy底层调用了collect, save行动算子,进行数据的落盘。

注意: 多个stage之间是串行执行的,stage间的Task是并行执行的。故分区越多,并发度越高。并不是调用了带有分区器的算子就一定会走shuffle,需要分区数量和分区规则变化时才会。

更多推荐

RDD行动算子和血缘关系

本文发布于:2023-12-07 15:13:14,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1671522.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:算子   血缘关系   RDD

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!