Spark coalesce和repartition"/>
Spark coalesce和repartition
目录
- Spark coalesce和repartition功能
- Spark coalesce和repartition的联系与区别
- 案例演示
Spark coalesce和repartition功能
2者都是重新分区
coalesce:缩减分区
repartition:扩大分区
Spark coalesce和repartition的联系与区别
- 一般情况下repartition是增大分区的repartition,coalesce是减少分区的
- repartition底层调用的就是coalesce方法,源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)
}
- repartition算子默认执行Shuffle,更确切的来说repartition一定会发生shuffle;coalesce算子默认不执行Shuffle,但可以根据传入的参数来选择是否发生shuffle
- 若使用coalesce方法,不执行Shuffle
缺点:容易产生数据倾斜
- 若使用coalesce方法,执行Shuffle
缺点:速度慢
4. 使用filter、where等方法对数据进行频繁过滤时,就会造成大量小分区的生成,这时建议使用coalesce方法
案例演示
package com.xcu.bigdata.spark.core.pg02_rdd.pg022_rdd_transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @Desc : -coalesce缩减分区(默认是不执行shuffle,一般用于缩减分区)* -repartition扩大分区(底层调用的是coalesce,默认是执行shuffle,一般用于扩大分区)*/
object Spark_Coalesce_Repartition {def main(args: Array[String]): Unit = {//创建配置文件val conf: SparkConf = new SparkConf().setAppName("Spark09_Coalesce_Repartition").setMaster("local[*]")//创建SparkContext,该对象是提交的入口val sc = new SparkContext(conf)//创建RDDval rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), numSlices = 3)//获取分区数println("初始分区数: " + rdd.getNumPartitions)//缩减分区(默认情况:不走shuffle)val newRDD: RDD[Int] = rdd.coalesce(numPartitions = 2)//缩减分区(开启shuffle)//val newRDD: RDD[Int] = rdd.coalesce(numPartitions = 2, shuffle = true)println("缩减后的分区个数: " + newRDD.getNumPartitions)//扩大分区(默认情况:开启shuffle)val newRDD1: RDD[Int] = rdd.repartition(6)println("扩大后的分区个数: " + newRDD1.getNumPartitions)//释放资源sc.stop()}
}
更多推荐
Spark coalesce和repartition
发布评论