算子(transformation)的操作"/>
spark常见转换算子(transformation)的操作
package com.meng.nan.day717import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDDimport scala.collection.mutable.ArrayBuffer
//常见转换算子(transformation)的操作例如
//map,flatmap,join,sample等object flatMapClass {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark_project").setLevel(Level.WARN)
val conf=new SparkConf().setAppName("flatMapClass").setMaster("local[*]");val sc=new SparkContext(conf)val list=List("safdasd")val rdd:RDD[String]=sc.parallelize(list)//字符串拆分flatMap()val chrdd:RDD[(Char,Int)]=rdd.flatMap(str=>{val ab=ArrayBuffer[(Char,Int)]()for (ch<- str){ab.append((ch,1))}ab})
// chrdd.foreach(ch=>println(ch))//字符串一一映射mapval list1=1 to 9val lsrdd:RDD[Int]=sc.parallelize(list1)
// val mapRDD=lsrdd.map(num=>num * 7)
// println("partitions"+mapRDD.getNumPartitions)
// mapRDD.foreach(println)
// val mapRDD=lsrdd.map(num=>(num,1))
// val mapRDD=lsrdd.map((_,1))
// for (sc<-mapRDD){
// println(sc)
// }//filter过滤
// val flterRdd=lsrdd.filter(fl=>(fl%2==0))
// flterRdd.foreach(println)//union联合操作val list2=List(3,1,2,2,11,22,33,44)val lr:RDD[Int]=sc.parallelize(list2)val unRDD:RDD[Int]=lsrdd.union(lr)unRDD.foreach(println)
// 抽样算子sample
// val smRDD=lsrdd.sample(false,0.2)
// smRDD.foreach(println)
// println(smRDD.count())}
}
package com.meng.nan.day717import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDDobject JionClass {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark_project").setLevel(Level.WARN)
//转换算子join操作所有的join操作,必须要求的RDD的类型时<K, V>// * K就是关联字段
val conf=new SparkConf().setMaster("local[*]").setAppName("JionClass")val sc=new SparkContext(conf)
jionOps(sc)}def jionOps(sc:SparkContext): Unit ={
val stu=List("1 刘冯曼娜 22 bd-1901-bj","2 常师 25 bd-1901-bj","3 张熹 24 bd-1901-sz","4 胡盼 18 bd-1901-wh"
)val scores = List("1 1 math 82","2 1 english 0","3 2 chinese 85.5","4 3 PE 99","5 10 math 99")//将字符串转为RDD形式val stuRDD:RDD[String]=sc.parallelize(stu)val scoresRDD:RDD[String]=sc.parallelize(scores)//查询所有学生信息//将学生表转化为map集合val stuMapRDD:RDD[(String,String)]=stuRDD.map(stringLine=>{val sid=stringLine.substring(0,1)val stuInf=stringLine.substring(1).trim(sid,stuInf)})//将成绩表转化为map集合val scoresMapRDD:RDD[(String,String)]=scoresRDD.map(scorLine=>{val filter=scorLine.split("\\s+")val sid=filter(1)val scoreInfo=filter(2)+" "+filter(3)(sid,scoreInfo)})
// //join操作val joinRDD:RDD[(String,(String,String))]=stuMapRDD.join(scoresMapRDD)
joinRDD.foreach{case (sid,(stuInf,scoreInfo))=>{println(s"sid:${sid}\tstuInf:${stuInf}\tscoreInfo:${scoreInfo}")}}
println("======================")//左连接查询val leftJoin:RDD[(String,(String,Option[String]))]=stuMapRDD.leftOuterJoin(scoresMapRDD)leftJoin.foreach { case (sid, (stuInf, scoreInfo)) => {println(s"sid${sid}\tstuInf:${stuInf}\tscoreInfo:${scoreInfo}")}}println("================================")//全连接
val fullMapJoin:RDD[(String,(Option[String],Option[String]))]=stuMapRDD.fullOuterJoin(scoresMapRDD)fullMapJoin.foreach{case (sid,(stuInf,scoreInfo))=>{println(s"sid:${sid}\tstuInfo:${stuInf}\tscoreInfo:${scoreInfo}")}}}
}
更多推荐
spark常见转换算子(transformation)的操作
发布评论