Spark partitionBy"/>
Spark partitionBy
目录
- Spark partitionBy 功能
- 案例演示
Spark partitionBy 功能
按照key重新分区
案例演示
rdd: RDD[(Int, String)] = sc.makeRDD(List((1, “aaa”), (2, “bbb”), (3, “ccc”)), numSlices = 3)
需求1:按照key的奇偶存放数据到不同分区(要求使用默认分区器)
结果如下:
0—>(2,bbb)
1—>(1,aaa),(3,ccc)需求2:将所有数据都存放到1号分区(要求自定义分区器)
结果如下:
2—>
0—>
1—>(1,aaa),(2,bbb),(3,ccc)
package com.xcu.bigdata.spark.core.pg02_rdd.pg022_rdd_transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}/*** @Desc : 按照Key重新分区* @note : RDD本身是没有partitionBy这个算子的,通过隐式转换动态给kv类型的RDD扩展的功能*/
object Spark12_PartitionBy {def main(args: Array[String]): Unit = {//创建配置文件val conf: SparkConf = new SparkConf().setAppName("Spark12_PartitionBy").setMaster("local[*]")//创建SparkContext,该对象是提交的入口val sc = new SparkContext(conf)//创建RDDval rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc")), numSlices = 3)//先按照分区打印分布rdd.mapPartitionsWithIndex {(index, datas) => {println(index + "--->" + datas.mkString(","))datas}}.collect()println("********************************************")//方式1 采用默认分区器val newRDD1: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))//查看分区后的数据分布newRDD1.mapPartitionsWithIndex {(index, datas) => {println(index + "--->" + datas.mkString(","))datas}}.collect()println("********************************************")//方式2 自定义分区器val newRDD2: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(3))//查看分区后的数据分布newRDD2.mapPartitionsWithIndex {(index, datas) => {println(index + "--->" + datas.mkString(","))datas}}.collect()//释放资源sc.stop()}
}class MyPartitioner(i: Int) extends Partitioner {//获取分区的个数override def numPartitions: Int = i//指定分区规则 返回值Int表示分区编号,从0开始override def getPartition(key: Any): Int = {//这里的代码逻辑和mapreduce类似1 //把所有数据存放到1号分区}
}
更多推荐
Spark partitionBy
发布评论