算子和血缘关系"/>
RDD行动算子和血缘关系
wordCount分布式运行
- 将wordCount进行打包上传,path使用args参数传参
- 启动hdfs和yarn
- 提交任务
[atguigu@hadoop102 spark-yarn]$ bin/spark-submit \
--class com.atguigu.spark.WordCount \
--master yarn \
./WordCount.jar \
/input \
集群模式:client/cluster
/output
注意: 如果集群模式选择为cluster,代码中的local[*]必须改为yarn,否则会报错。
foreach和collect
foreach底层是多线程打印的,出现的结果是分区间有序,但是整体无序,如果需要有序,只能将分区数设置为1,可以节省一些流量。
collect是将各个Executor计算的结果聚合在一起,输出结果是整体有序的。
行动算子
算子名称 | 作用 |
---|---|
collect | 按顺序从各个executor中收取数据 |
foreach | 直接在各个executor中操作数据 |
count | 统计元素个数 |
first | 返回0号分区的第一个值 |
take(int n) | 根据参数个数提取数据 |
countByKey | rdd需要是kv对集合,根据key统计个数,返回map结果 |
save | 以特定格式,一般为textFile, 保存数据 |
Spark之Kryo序列化
适合简单数据的序列化,比hadoop的序列化更加轻量。使用方法如下:
- 替换默认的序列化机.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
- 注册序列化对象,参数为数组类型new 类名[]{类1,类2}
- 后面使用conf对象创建sc即可
// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore")// 替换默认的序列化机制.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册需要使用kryo序列化的自定义类.registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);
血缘关系
RDD.toDebugString().sout可以查看所有RDD的血缘关系。
打印的关系如下所示,其中(2)表示分区数量,±表示进行了shuffle,分区数量有所改变。如果进入了shuffle,计算时就会划分为多个阶段,即阶段数 = shuflle数量 +1。
常见的会走shffle的算子有reduceByKey和sortBy.
(2) ShuffledRDD[4] at reduceByKey at WordCount2.java:56 []+-(2) MapPartitionsRDD[3] at mapToPair at WordCount2.java:47 []| MapPartitionsRDD[2] at flatMap at WordCount2.java:36 []| input/1.txt MapPartitionsRDD[1] at textFile at WordCount2.java:33 []| input/1.txt HadoopRDD[0] at textFile at WordCount2.java:33 []
宽依赖:分区数量改变了,即走了shuffle
窄依赖:分区数量和分区规则不变,不走shuffle.
注: join操作如果是分桶join时,不需要走shuffle,其他普通join时则需要打散分区进行shuffle操作。
Stage任务划分
- 提交任务后,画出DAG有向无环图
- 划分阶段,阶段划分的标志是是否有Shuffle.
- 划分Task,任务运行的最小基本单位,按照分区进行划分, 每一份分区一个Task.
- 任务交给Worker即executor去执行,executor中的线程数等于CPU个数,每个CPU可以去执行一个Task。
- job划分,每执行一个行动算子就是一个job。如果算子计算过程中有sortBy算子,会划分为两个job。因为sortBy底层调用了collect, save行动算子,进行数据的落盘。
注意: 多个stage之间是串行执行的,stage间的Task是并行执行的。故分区越多,并发度越高。并不是调用了带有分区器的算子就一定会走shuffle,需要分区数量和分区规则变化时才会。
更多推荐
RDD行动算子和血缘关系
发布评论