SparkCore排序之-----基本排序

编程入门 行业动态 更新时间:2024-10-22 21:33:33

<a href=https://www.elefans.com/category/jswz/34/1675615.html style=SparkCore排序之-----基本排序"/>

SparkCore排序之-----基本排序

SparkCore实现排序功能

        • 基本数据类型排序
        • 自定义的类型排序
        • 隐式转换实现排序
        • 优雅方式实现排序

基本数据类型排序

对于Double、Int 等类型,Spark中默认是实现了排序的功能,我们可以直接使用SortBy进行排序,将我们所需要的字段传递进去,第二个参数默认是false

object SortApp01 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val product: RDD[String] = sc.parallelize(List("皮鞭 20 10", "蜡烛 20 1000", "电视 1000 20", "篮球 200 300"))// 按照价格降序排列val productData: RDD[(String, Double, Int)] = product.map(x => {val splits = x.split(" ")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt(name, price, number)})// 需要加上collect才生效productData.sortBy(_._2, false).collect().foreach(println)sc.stop()}
}

按照两个字段进行排序时候,在sortby里面传递进去一个tuple即可

def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val product: RDD[String] = sc.parallelize(List("皮鞭 20 10", "蜡烛 20 1000", "电视 1000 20", "篮球 200 300"))// 按照价格降序排列val productData: RDD[(String, Double, Int)] = product.map(x => {val splits = x.split(" ")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt(name, price, number)})// 价格降序排列,如果价格相同则按照库存降序排列// 多个字段的排序,在sortBy字段中可以传入一个tuple,可以按照tuple排列productData.sortBy(x => {(-x._2, -x._3)}).collect().foreach(println)sc.stop()
}

自定义的类型排序

自定义类型进行排序的时候可以使用普通的类也可以使用case class进行排序,使用普通的类时候需要实现Serializable接口,case calss则直接使用即可,这两种方式都要实现Ordered接口,重写compare to 方法。

object SortApp03 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val product: RDD[String] = sc.parallelize(List("皮鞭 20 10", "蜡烛 20 1000", "电视 1000 20", "篮球 200 300"))val productData: RDD[ProductCassClass] = product.map(x => {val splits = x.split(" ")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt//      new Product(name, price, number)ProductCassClass(name, price, number)})// 价库存的升序进行排序// 使用类的方式进行排序productData.sortBy(x => x).collect().foreach(println)sc.stop()}
}
class Product(val name: String, val price: Double, val number: Int) extends Ordered[Product] with Serializable {override def toString: String = {name + "," + price + "," + number}// 在此处实现我们的排序规则override def compare(that: Product): Int = {this.number - that.number}
}// case calss 不需要实现Serializable
case class ProductCassClass(name: String, price: Double, number: Int) extends Ordered[ProductCassClass] {override def toString: String = {"case" + name + "," + price + "," + number}// 在此处实现我们的排序规则override def compare(that: ProductCassClass): Int = {this.number - that.number}
}

隐式转换实现排序

这里可以采用隐式转换来实现排序逻辑,可以定义隐式变量也可以定义隐式方法实现排序逻辑

object SortApp04 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp04").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val product: RDD[String] = sc.parallelize(List("皮鞭 20 10", "蜡烛烛 20 1000", "高清电视 1000 20", "NBA篮球 200 300"))val productData: RDD[ProductInfo] = product.map(x => {val splits = x.split(" ")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt//      new Product(name, price, number)new ProductInfo(name, price, number)})// 优先级:隐式变量> 自定义类> 隐式方法// 使用隐式方法实现排序implicit def productInfo2Ordered(productInfo: ProductInfo): Ordered[ProductInfo] = new Ordered[ProductInfo] {override def compare(that: ProductInfo): Int = {if (productInfo.price - that.price > 0) 1 else -1}}// 使用隐式变量实现排序implicit val productInfo2OrderingVariable: Ordering[ProductInfo] = new Ordering[ProductInfo] {override def compare(x: ProductInfo, y: ProductInfo): Int = {y.number - x.number}}// 使用类的方式进行排序,使用自己封装的print// print 传入0或者不传递表示打印,不是0则不打印productData.sortBy(x => x).print(0)sc.stop()}
}// 不用compare方法使用排序功能
class ProductInfo(val name: String, val price: Double, val number: Int) extends Serializable with Ordered[ProductInfo] {override def toString: String = {"ProductInfo" + name + "," + price + "," + number}override def compare(that: ProductInfo): Int = {this.name.length - that.name.length}
}

优雅方式实现排序

使用Ordering的on方法简化我们的排序逻辑,Ordering方法的泛型为排序返回的类型,on方法的参数是进去的数据类型,在on方法中实现排序逻辑

object SortApp05 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SortApp05").setMaster("local[2]")val sc = new SparkContext(conf)// 商品 价格 库存val producto = sc.parallelize(List("皮鞭,20,10", "蜡烛,20,1000", "电视,1000,1000", "扑克,5,2000"))val productDatas = producto.map(x => {val splits = x.split(",")val name = splits(0)val price = splits(1).toDoubleval number = splits(2).toInt(name, price, number)})/*** (Double, Int) 定义排序规则返回的类型* (String, Double, Int) 进来的数据类型* x=> (-x._2,-x._3)*/implicit val ord = Ordering[(Double, Int)].on[(String, Double, Int)](x => (-x._2, -x._3))productDatas.sortBy(x => x).foreach(println)sc.stop()}
}

更多推荐

SparkCore排序之-----基本排序

本文发布于:2023-07-28 20:16:03,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1298274.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:SparkCore

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!