spark常见转换算子(transformation)的操作

编程入门 行业动态 更新时间:2024-10-24 16:21:54

spark常见转换<a href=https://www.elefans.com/category/jswz/34/1748093.html style=算子(transformation)的操作"/>

spark常见转换算子(transformation)的操作

package com.meng.nan.day717import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDDimport scala.collection.mutable.ArrayBuffer
//常见转换算子(transformation)的操作例如
//map,flatmap,join,sample等object flatMapClass {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark_project").setLevel(Level.WARN)
val  conf=new SparkConf().setAppName("flatMapClass").setMaster("local[*]");val sc=new SparkContext(conf)val list=List("safdasd")val rdd:RDD[String]=sc.parallelize(list)//字符串拆分flatMap()val chrdd:RDD[(Char,Int)]=rdd.flatMap(str=>{val ab=ArrayBuffer[(Char,Int)]()for (ch<- str){ab.append((ch,1))}ab})
//    chrdd.foreach(ch=>println(ch))//字符串一一映射mapval list1=1 to 9val lsrdd:RDD[Int]=sc.parallelize(list1)
//        val mapRDD=lsrdd.map(num=>num * 7)
//        println("partitions"+mapRDD.getNumPartitions)
//        mapRDD.foreach(println)
//        val mapRDD=lsrdd.map(num=>(num,1))
//        val mapRDD=lsrdd.map((_,1))
//        for (sc<-mapRDD){
//        println(sc)
//        }//filter过滤
//        val flterRdd=lsrdd.filter(fl=>(fl%2==0))
//        flterRdd.foreach(println)//union联合操作val list2=List(3,1,2,2,11,22,33,44)val lr:RDD[Int]=sc.parallelize(list2)val unRDD:RDD[Int]=lsrdd.union(lr)unRDD.foreach(println)
//      抽样算子sample
//      val smRDD=lsrdd.sample(false,0.2)
//      smRDD.foreach(println)
//      println(smRDD.count())}
}
package com.meng.nan.day717import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDDobject JionClass {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark_project").setLevel(Level.WARN)
//转换算子join操作所有的join操作,必须要求的RDD的类型时<K, V>//     * K就是关联字段
val conf=new SparkConf().setMaster("local[*]").setAppName("JionClass")val sc=new SparkContext(conf)
jionOps(sc)}def jionOps(sc:SparkContext): Unit ={
val stu=List("1 刘冯曼娜 22 bd-1901-bj","2 常师 25 bd-1901-bj","3 张熹 24 bd-1901-sz","4 胡盼 18 bd-1901-wh"
)val scores = List("1 1 math 82","2 1 english 0","3 2 chinese 85.5","4 3 PE 99","5 10 math 99")//将字符串转为RDD形式val stuRDD:RDD[String]=sc.parallelize(stu)val scoresRDD:RDD[String]=sc.parallelize(scores)//查询所有学生信息//将学生表转化为map集合val stuMapRDD:RDD[(String,String)]=stuRDD.map(stringLine=>{val sid=stringLine.substring(0,1)val stuInf=stringLine.substring(1).trim(sid,stuInf)})//将成绩表转化为map集合val scoresMapRDD:RDD[(String,String)]=scoresRDD.map(scorLine=>{val filter=scorLine.split("\\s+")val sid=filter(1)val scoreInfo=filter(2)+" "+filter(3)(sid,scoreInfo)})
//    //join操作val joinRDD:RDD[(String,(String,String))]=stuMapRDD.join(scoresMapRDD)
joinRDD.foreach{case (sid,(stuInf,scoreInfo))=>{println(s"sid:${sid}\tstuInf:${stuInf}\tscoreInfo:${scoreInfo}")}}
println("======================")//左连接查询val leftJoin:RDD[(String,(String,Option[String]))]=stuMapRDD.leftOuterJoin(scoresMapRDD)leftJoin.foreach { case (sid, (stuInf, scoreInfo)) => {println(s"sid${sid}\tstuInf:${stuInf}\tscoreInfo:${scoreInfo}")}}println("================================")//全连接
val fullMapJoin:RDD[(String,(Option[String],Option[String]))]=stuMapRDD.fullOuterJoin(scoresMapRDD)fullMapJoin.foreach{case (sid,(stuInf,scoreInfo))=>{println(s"sid:${sid}\tstuInfo:${stuInf}\tscoreInfo:${scoreInfo}")}}}
}

更多推荐

spark常见转换算子(transformation)的操作

本文发布于:2023-06-29 10:52:14,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/943739.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:算子   常见   操作   spark   transformation

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!