SparkCore排序之-----基本排序"/>
SparkCore排序之-----基本排序
SparkCore实现排序功能
- 基本数据类型排序
- 自定义的类型排序
- 隐式转换实现排序
- 优雅方式实现排序
基本数据类型排序
对于Double、Int 等类型,Spark中默认是实现了排序的功能,我们可以直接使用SortBy进行排序,将我们所需要的字段传递进去,第二个参数默认是false
object SortApp01 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val product: RDD[String] = sc.parallelize(List("皮鞭 20 10", "蜡烛 20 1000", "电视 1000 20", "篮球 200 300"))// 按照价格降序排列val productData: RDD[(String, Double, Int)] = product.map(x => {val splits = x.split(" ")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt(name, price, number)})// 需要加上collect才生效productData.sortBy(_._2, false).collect().foreach(println)sc.stop()}
}
按照两个字段进行排序时候,在sortby里面传递进去一个tuple即可
def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val product: RDD[String] = sc.parallelize(List("皮鞭 20 10", "蜡烛 20 1000", "电视 1000 20", "篮球 200 300"))// 按照价格降序排列val productData: RDD[(String, Double, Int)] = product.map(x => {val splits = x.split(" ")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt(name, price, number)})// 价格降序排列,如果价格相同则按照库存降序排列// 多个字段的排序,在sortBy字段中可以传入一个tuple,可以按照tuple排列productData.sortBy(x => {(-x._2, -x._3)}).collect().foreach(println)sc.stop()
}
自定义的类型排序
自定义类型进行排序的时候可以使用普通的类也可以使用case class进行排序,使用普通的类时候需要实现Serializable接口,case calss则直接使用即可,这两种方式都要实现Ordered接口,重写compare to 方法。
object SortApp03 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val product: RDD[String] = sc.parallelize(List("皮鞭 20 10", "蜡烛 20 1000", "电视 1000 20", "篮球 200 300"))val productData: RDD[ProductCassClass] = product.map(x => {val splits = x.split(" ")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt// new Product(name, price, number)ProductCassClass(name, price, number)})// 价库存的升序进行排序// 使用类的方式进行排序productData.sortBy(x => x).collect().foreach(println)sc.stop()}
}
class Product(val name: String, val price: Double, val number: Int) extends Ordered[Product] with Serializable {override def toString: String = {name + "," + price + "," + number}// 在此处实现我们的排序规则override def compare(that: Product): Int = {this.number - that.number}
}// case calss 不需要实现Serializable
case class ProductCassClass(name: String, price: Double, number: Int) extends Ordered[ProductCassClass] {override def toString: String = {"case" + name + "," + price + "," + number}// 在此处实现我们的排序规则override def compare(that: ProductCassClass): Int = {this.number - that.number}
}
隐式转换实现排序
这里可以采用隐式转换来实现排序逻辑,可以定义隐式变量也可以定义隐式方法实现排序逻辑
object SortApp04 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp04").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val product: RDD[String] = sc.parallelize(List("皮鞭 20 10", "蜡烛烛 20 1000", "高清电视 1000 20", "NBA篮球 200 300"))val productData: RDD[ProductInfo] = product.map(x => {val splits = x.split(" ")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt// new Product(name, price, number)new ProductInfo(name, price, number)})// 优先级:隐式变量> 自定义类> 隐式方法// 使用隐式方法实现排序implicit def productInfo2Ordered(productInfo: ProductInfo): Ordered[ProductInfo] = new Ordered[ProductInfo] {override def compare(that: ProductInfo): Int = {if (productInfo.price - that.price > 0) 1 else -1}}// 使用隐式变量实现排序implicit val productInfo2OrderingVariable: Ordering[ProductInfo] = new Ordering[ProductInfo] {override def compare(x: ProductInfo, y: ProductInfo): Int = {y.number - x.number}}// 使用类的方式进行排序,使用自己封装的print// print 传入0或者不传递表示打印,不是0则不打印productData.sortBy(x => x).print(0)sc.stop()}
}// 不用compare方法使用排序功能
class ProductInfo(val name: String, val price: Double, val number: Int) extends Serializable with Ordered[ProductInfo] {override def toString: String = {"ProductInfo" + name + "," + price + "," + number}override def compare(that: ProductInfo): Int = {this.name.length - that.name.length}
}
优雅方式实现排序
使用Ordering的on方法简化我们的排序逻辑,Ordering方法的泛型为排序返回的类型,on方法的参数是进去的数据类型,在on方法中实现排序逻辑
object SortApp05 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp05").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val producto = sc.parallelize(List("皮鞭,20,10", "蜡烛,20,1000", "电视,1000,1000", "扑克,5,2000"))val productDatas = producto.map(x => {val splits = x.split(",")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt(name, price, number)})/*** (Double, Int) 定义排序规则返回的类型* (String, Double, Int) 进来的数据类型* x=> (-x._2,-x._3)*/implicit val ord = Ordering[(Double, Int)].on[(String, Double, Int)](x => (-x._2, -x._3))productDatas.sortBy(x => x).foreach(println)sc.stop()}
}
更多推荐
SparkCore排序之-----基本排序
发布评论