数据处理技术Spark》"/>
《大数据处理技术Spark》
从林子雨老师的网课上学到的东西,林老师讲的特别清晰,记录一下,防止忘记。
以下是资料的链接:
- hadoop安装
- 课程
- 课件链接
其他资料:
- Spark-SQL之DataFrame操作大全
文章目录
- 1. 概述
- 大数据关键技术
- 大数据计算模式:
- 代表性大数据技术之Hadoop
- 代表性大数据技术之Spark
- 代表性大数据技术之Flink
- 代表性大数据技术之Beam
- hadoop伪分布实例
- 2. scala 语法
- 5. RDD
- 5.1 RDD编程
- 5.1.1 RDD创建
- 5.1.1.1 从文件系统中加载数据创建RDD
- 5.1.1.2 通过并行集合(数组)创建RDD
- 5.1.2 RDD操作
- 5.1.2.1 转换操作
- 5.1.2.2 行动操作
- 5.1.2.3 惰性机制
- 5.1.2.4 实例
- 5.1.3 持久化
- 5.1.4 分区
- 5.1.5 打印元素
- 5.2 Pair RDD
- 5.2.1 Pair RDD的创建
- 5.2.2 常用的Pair RDD转换操作
- 5.2.3 一个综合实例
- 5.3 共享变量
- 5.4 数据读写
- 5.4.1 文件数据读写
- 5.4.1.1 本地文件系统的数据读写
- 5.4.1.2 分布式文件系统HDFS的数据读写
- 5.4.1.3 JSON文件的数据读写
- 5.4.2 读写HBase数据
- 5.5 WordCount程序解析
- 5.6 综合案例
- 6 Spark SQL
- 6.1 简介
- 6.2 DataFrame和RDD区别
- 6.3 DataFrame的创建
- 6.4 从RDD到DF
- 6.4.1 利用反射机制推断RDD模式
- 6.4.2 使用编程方式定义RDD模式
- 6.4.3 把RDD保存成文件
- 6.5 读取和保存数据
- 6.5.1 读写Parquet
- 6.5.2 通过JDBC连接数据库
- Ubuntu安装MySQL及常用操作
- 6.5.3 连接Hive读写数据
- 6.5.3.1 Hive简介和安装
- 6.5.3.2 让Spark包含Hive支持
- 6.5.3.3 在Hive中创建数据库和表
- 6.5.3.4 连接Hive读写数据
1. 概述
关系型数据库和非关系型数据库
五种主流的大数据计算框架:
- Hadoop框架
由于Hadoop的计算任务需要在集群的多个节点上多次读写,因此在速度上会稍显劣势,但是其吞吐量也同样是其他框架所不能匹敌的。 - Storm框架
与Hadoop的批处理模式不同,Storm采用的是流计算框架,由Twitter开源并且托管在GitHub上。与Hadoop类似的是,Storm也提出了两个计算角色,分别为Spout和Bolt。如果说Hadoop是水桶,只能一桶一桶的去井里扛,那么Storm就是水龙头,只要打开就可以源源不断的出水。Storm支持的语言也比较多,Java、Ruby、Python等语言都能很好的支持。由于Storm是流计算框架,因此使用的是内存,延迟上有极大的优势,但是Storm不会持久化数据。 - Samza框架
对于已经有Hadoop+Kafka工作环境的团队来说,Samza是一个不错的选择,并且Samza在多个团队使用的时候能体现良好的性能。 - Spark框架
现阶段的Spark和Hadoop搭配起来使用更加合适。 - Flink框架
Flink也是一种混合式的计算框架,但是在设计初始,Fink的侧重点在于处理流式数据,这与Spark的设计初衷恰恰相反,而在市场需求的驱使下,两者都在朝着更多的兼容性发展。Flink目前不是很成熟,更多情况下Flink还是起到一个借鉴的作用。
大数据关键技术
- 分布式存储:
- Google提出了GFS,Hadoop开源实现了GFS,叫做HDFS
- Google又提出了BigTable,Hadoop开源实现了BigTable,叫做HBase
- 分布式处理:
大数据计算模式:
代表性大数据技术之Hadoop
Hive:会将sql语句转成底层的MapReduce任务
zookeeper:帮助选择主节点等
HBase:存储关系数据
Sqoop:完成从关系型数据库和hadoop数据之间的导入导出
Ambari:可视化的部署等等都归它管理
MapReduce:可以将程序分发到不同的机器上(计算向数据靠拢)
- 缺点:reduce任务必须要等待所有的map任务完成之后才能进行
- 缺点:每次数据都要写磁盘
Yarn:资源调度和管理框架,帮助调动底层cpu和内存资源用的
代表性大数据技术之Spark
最热门的主流技术。可以和hadoop兼容:可以读取HDFS,Hive/HDbase兼容;可以和noSQl兼容
spark克服了Hadoop的MapReduce操作的缺陷:
- 基于磁盘的计算,读写磁盘代价高。map和reduce都要读写磁盘,迭代计算开销大
- 延迟高。map没结束,reduce不能开启;读写磁盘慢
- 只有Map和reduce,有一些功能不能实现
spark在MapReduce的基础上的改进:
- 除了map/reduce外还提供了很多函数,像是filter、join、groupBy等
- 基于内存的计算
- 基于DAG的任务调度机制,优于MapReduce的迭代
代表性大数据技术之Flink
flink性能好,为什么没有spark火?“既生瑜何生亮”+_+
代表性大数据技术之Beam
Beam提供了统一的编程接口起来,可以帮助转换成spark/hadoop/flink
hadoop伪分布实例
伪分布式读取的则是 HDFS 上的数据。要使用 HDFS,首先需要在 HDFS 中创建用户目录:
hdfs dfs -mkdir -p /user/hadoop # 已经将hadoop中的bin加入到环境变量中
将本地的word.txt
复制到分布式文件系统的/user/hadoop/input
中
hdfs dfs -mkdir input # 因为现在使用的是hadoop用户,因此可以使用相对路径
hdfs dfs -put ./word.txt input # put
hdfs dfs -ls input # 可以查看文件列表
将hadoop的运行结果取回到本地
hdfs dfs -get output ./output # get
cat ./output/* # cat查看
这里hdfs dfs
可以换成:
hadoop fs
:适用于任何不同的文件系统,比如本地文件系统和HDFS文件系统hadoop dfs
:只能适用于HDFS文件系统hdfs dfs
:跟hadoop dfs的命令作用一样,也只能适用于HDFS文件系统
2. scala 语法
写到了scala learning中
5. RDD
5.1 RDD编程
rdd编程-林子雨老师
5.1.1 RDD创建
5.1.1.1 从文件系统中加载数据创建RDD
-
Spark采用textFile()方法来从文件系统中加载数据创建RDD
-
该方法把文件的URI作为参数,这个URI可以是:
-
本地文件系统的地址
val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
-
或者是分布式文件系统HDFS的地址
val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") val lines = sc.textFile("/user/hadoop/word.txt") val lines = sc.textFile("word.txt")
-
或者是Amazon S3的地址等等
-
5.1.1.2 通过并行集合(数组)创建RDD
-
可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合 (数组)上创建。
val array = Array(1,2,3,4,5) val rdd = sc.parallelize(array)val list = List(1,2,3,4,5) val rdd = sc.parallelize(list)
5.1.2 RDD操作
5.1.2.1 转换操作
-
filter(func):筛选出满足函数func的元素,并返回一个新的数据集
lines.filter(line => line.contains("Spark")).count()
-
map(func):将每个元素传递到函数func中,并将结果返回为 一个新的数据集
-
flatMap(func):与map()相似,但每个输入元素都可以映射到 0或多个输出结果
-
groupByKey():应用于(K,V)键值对的数据集时,返回一个新 的(K, Iterable)形式的数据集
-
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一 个新的(K, V)形式的数据集,其中的每个值是将每个key传递到 函数func中进行聚合
5.1.2.2 行动操作
- count() 返回数据集中的元素个数
- collect() 以数组的形式返回数据集中的所有元素
- first() 返回数据集中的第一个元素
- take(n) 以数组的形式返回数据集中的前n个元素
- reduce(func) 通过函数func(输入两个参数并返回一个值) 聚合数据集中的元素
- foreach(func) 将数据集中的每个元素传递到函数func中运行
5.1.2.3 惰性机制
只有动作类型的操作才能真正触发计算
5.1.2.4 实例
找出文本文件中单行文本所包含的单词数量的最大值
val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)
5.1.3 持久化
在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执 行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
- 可以通过持久化(缓存)机制避免这种重复计算的开销。
- 可以使用
persist()
方法对一个RDD标记为持久化 - 之所以说“标记为持久化”,是因为出现persist()语句的地 方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化
- 持久化后的RDD将会被保留在计算节点的内存中被后面的 行动操作重复使用
persist()的圆括号中包含的是持久化级别参数:
persist(MEMORY_ONLY)
:表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容persist(MEMORY_AND_DISK)
表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上- 一般而言,使用
cache()
方法时,会调用 persist(MEMORY_ONLY) - 可以使用
unpersist()
方法手动地把持久化的RDD从缓存中移除
val list = List("Hadoop","Spark","Hive")
val rdd = sc.parallelize(list)
rdd.cache()
println(rdd.count()) // 第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
5.1.4 分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。
为什么要分区?(1)增加并行度 (2)减少通信开销
-
原则:使得分区的个数尽量等于集群中的CPU核心 (core)数目。
-
默认分区数:对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模 式、Mesos模式),都可以通过设置
spark.default.parallelism
这个参数 的值,来配置默认的分区数目,一般而言:- 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N
- Apache Mesos:默认的分区数为8
- Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者 中取较大值作为默认值
-
手动设置分区:
-
创建 RDD 时:在调用
textFile
和parallelize
方法时候手动指定 分区个数即可,语法格式:sc.textFile(path, partitionNum)
val rdd = sc.parallelize(array,2)
-
通过转换操作得到新 RDD 时:直接调用
repartition
方法即可var rdd2 = data.repartition(4) rdd2.partitions.size
-
-
自定义分区
import org.apache.spark.{Partitioner, SparkContext, SparkConf} //自定义分区类,需继承Partitioner类 class UsridPartitioner(numParts:Int) extends Partitioner{//覆盖分区数override def numPartitions: Int = numParts//覆盖分区号获取函数override def getPartition(key: Any): Int = {key.toString.toInt%10 } } object Test {def main(args: Array[String]) {val conf=new SparkConf()val sc=new SparkContext(conf)//模拟5个分区的数据val data=sc.parallelize(1 to 10,5)//根据尾号转变为10个分区,分写到10个文件data.map((_,1)).partitionBy(new UsridPartitioner(10)).map(_._1).saveAsTextFile("/chenm/partition") //占位符 _} }
5.1.5 打印元素
- 在实际编程中,经常需要把RDD中的元素打印输出到屏幕上(标准 输出stdout),一般会采用语句
rdd.foreach(println)
或者rdd.map(println)
- •当采用本地模式(local)在单机上执行时,这些语句会打印出一个 RDD中的所有元素。但是,当采用集群模式执行时,在worker节点上 执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制 节点Driver Program中,因此,任务控制节点Driver Program中的 stdout是不会显示打印语句的这些输出内容的
- 为了能够把所有worker节点上的打印输出信息也显示到Driver Program中,可以使用collect()方法,比如,
rdd.collect().foreach(println)
,但是,由于collect()方法会把各个 worker节点上的所有RDD元素都抓取到Driver Program中,因此,这 可能会导致内存溢出。因此,当你只需要打印RDD的部分元素时,可 以采用语句rdd.take(100).foreach(println)
概括下来是:
- 本地模式:
rdd.foreach(println)
或者rdd.map(println)
- 集群:
rdd.take(100).foreach(println)
5.2 Pair RDD
5.2.1 Pair RDD的创建
- 第一种创建方式:从文件中加载
- 第二种创建方式:通过并行集合(数组)创建RDD
5.2.2 常用的Pair RDD转换操作
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
-
reduceByKey(func)
:reduceByKey(func)的功能是,使用func函数合并具有相同键的值pairRDD.reduceByKey((a,b)=>a+b).foreach(println) // (Spark,2) (Hive,1) (Hadoop,1)
-
groupByKey()
:对具有相同键的值进行分组pairRDD.groupByKey() // ("spark",(1,2))和("hadoop",(3,5))
-
reduceByKey和groupByKey的区别
- reduceByKey用于对每个key对应的多个value进行merge 操作,最重要的是它能够在本地先进行merge操作,并且 merge操作可以通过函数自定义 reduceByKey和groupByKey的区别
- groupByKey也是对每个key进行操作,但只生成一个 sequence,groupByKey本身不能自定义函数,需要先用 groupByKey生成RDD,然后才能对此RDD通过map进行 自定义函数操作
-
keys
:
pairRDD.keys
pairRDD.keys.foreach(println)
-
values
:pairRDD.values
-
sortByKey()
和sortBy()
:val d1 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9))) d1.reduceByKey(_+_).foreach(println) // (g,21) // (b,38) // (a,42) // (f,29) // (d,9) // (e,17) // (c,27) d1.reduceByKey(_+_).foreach(print) // (f,29)(d,9)(a,42)(g,21)(b,38)(c,27)(e,17) d1.reduceByKey(_+_).collect // res1: Array[(String, Int)] = Array((a,42), (b,38), (c,27), (d,9), (e,17), (f,29), (g,21)) d1.reduceByKey(_+_).sortByKey().collect // res4: Array[(String, Int)] = Array((a,42), (b,38), (c,27), (d,9), (e,17), (f,29), (g,21)) d1.reduceByKey(_+_).sortBy(_._2).collect // res3: Array[(String, Int)] = Array((d,9), (e,17), (g,21), (c,27), (f,29), (b,38), (a,42))
-
mapValues(func)
:对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化
d1.collect
// res16: Array[(String, Int)] = Array((c,8), (b,25), (c,17), (a,42), (b,4), (d,9), (e,17), (c,2), (f,29), (g,21), (b,9))
d1.mapValues(_+1).collect
// res17: Array[(String, Int)] = Array((c,9), (b,26), (c,18), (a,43), (b,5), (d,10), (e,18), (c,3), (f,30), (g,22), (b,10))
-
join
:表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有 在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5))) val pairRDD2 = sc.parallelize(Array(("spark","fast"))) pairRDD1.join(pairRDD2).collect // res18: Array[(String, (Int, String))] = Array((spark,(1,fast)), (spark,(2,fast)))
-
combineByKey
:
5.2.3 一个综合实例
给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值 对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值, 也就是计算每种图书的每天平均销量。
val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
5.3 共享变量
-
广播变量(broadcast variables)
- 广播变量用来把变量在所有节点的内存之间进行共享
- 可以通过调用
SparkContext.broadcast(v)
来从一个普通变量v中创建一个广 播变量。
-
累加器(accumulators)
-
累加器则支持在所有不同节点之间进行累加计算,通常可以被用来实 现计数器(counter)和求和(sum)。
-
一个数值型的累加器,可以通过调用
SparkContext.longAccumulator()
或者SparkContext.doubleAccumulator()
来创建。 -
运行在集群中 的任务,就可以使用add方法来把数值累加到累加器上,但 是,这些任务只能做累加操作,不能读取累加器的值,只 有任务控制节点(Driver Program)可以使用
value
方法来 读取累加器的值val accum = sc.longAccumulator("My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) accum.value //res1: Long = 10
-
5.4 数据读写
5.4.1 文件数据读写
5.4.1.1 本地文件系统的数据读写
val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback.txt")
5.4.1.2 分布式文件系统HDFS的数据读写
val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
// val textFile = sc.textFile("/user/hadoop/word.txt")
// val textFile = sc.textFile("word.txt")
textFile.first()
5.4.1.3 JSON文件的数据读写
import scala.util.parsing.json.JSON
val jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
// jsonStr.foreach(println)
val result = jsonStrs.map(s => JSON.parseFull(s))
result.foreach( {r => r match {case Some(map: Map[String, Any]) => println(map)case None => println("Parsing failed")case other => println("Unknown data structure: " + other)
剩下的部分直接参考林老师的ppt吧
5.4.2 读写HBase数据
5.5 WordCount程序解析
5.6 综合案例
6 Spark SQL
6.1 简介
hive on spark == Shark,hive将SQL语句转为MR;Shark将SQL转为Spark的应用程序代码;Shark建立在hive上,受限与hive,但是效率提升了10-100倍;MR是进程级别的并行,Shark是线程级别的并行,存在线程安全的保证,因此之后停止了更新Spark SQL。
spark SQL在兼容Hive基础上,只是借鉴了Hive的语法解析
6.2 DataFrame和RDD区别
spark SQL采用的不是RDD,而是DataFrame。DataFrame是结构化的对象,查询效率更高。
6.3 DataFrame的创建
-
创建
import org.apache.spark.sql.SparkSession val spark=SparkSession.builder().getOrCreate() // 使支持RDDs转换DF及后续sql操作 import spark.implicits._ val df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json") df.show() val df2 = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load("file:///usr/local/spark/mycode/tag_uid_idx.csv") println(df.columns)
后两行的结果:
```df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] +----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+```
-
一些常用的DataFrame操作
df.printSchema() //打印模式信息 df.select(df("name"),df("age")+1).show() //选择多列 df.filter(df("age")>20)show() // 条件过滤 df.groupBy("age").count().show() // 分组聚合 df.sort(df.("age").desc).show() // 排序 df.sort(df.("age").desc,df.("name").asc).show() // 多列排序 df.select(df.("age"),df.("name").as("username")).show() //对列重命名
6.4 从RDD到DF
6.4.1 利用反射机制推断RDD模式
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //导入包,支持把一个RDD隐式转换为一个DataFrame
case class Person(name: String, age: Long) // 定义一个case class
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF.createOrReplaceTempView(“people”) // 必须注册为临时表才能供下面的查询使用
val personsRDD = spark.sql("select name,age from people where age > 20") // 最终生成一个DataFrame
personsRDD.map(t => “Name:”+t(0)+“,”+“Age:”+t(1)).show() // DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
6.4.2 使用编程方式定义RDD模式
当无法提前定义case class时,就需要采用编程方式定义RDD模式
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//定义一个模式字符串
val schemaString = "name age"
//根据模式字符串生成模式
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
//从上面打印的信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//对peopleRDD 这个RDD中的每一行元素都进行解析
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema)
//必须注册为临时表才能供下面查询使用
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")
results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
6.4.3 把RDD保存成文件
- 第一种保存方法
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
val textFile = sc.textFile("file:///usr/local/spark/mycode/newpeople.csv")
write.format()
支持输出 json,parquet, jdbc, orc, libsvm, csv, text等格式文件
- 第二种
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
df.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
6.5 读取和保存数据
6.5.1 读写Parquet
6.5.2 通过JDBC连接数据库
Ubuntu安装MySQL及常用操作
mysql的jdbc驱动程序,下载地址
6.5.3 连接Hive读写数据
6.5.3.1 Hive简介和安装
《Ubuntu安装hive,并配置mysql作为元数据库》
6.5.3.2 让Spark包含Hive支持
测试spark版本是否支持Hive
import org.apache.spark.sql.hive.HiveContext
// 支持的输出:import org.apache.spark.sql.hive.HiveContext
6.5.3.3 在Hive中创建数据库和表
- 启动hadoop:
start-all.sh
(已经将hadoop的路径加入到环境变量中) - 启动Hive:
hive
, 添加数据表
// hive脚本下执行
create database if not exists sparktest;//创建数据库sparktest
show databases;
create table if not exists sparktest.student(id int,name string, gender string, age int);
use sparktest; //切换到sparktest
show tables; //显示sparktest数据库下面有哪些表
insert into student values(1,'Xueqian','F',23); //插入一条记录
insert into student values(2,'Weiliang','M',24); //再插入一条记录
select * from student; //显示student表中的记录
6.5.3.4 连接Hive读写数据
- 在
spark-shell
(包含Hive支持)中执行以下命令从Hive中读取数据
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
val warehouseLocation = "spark-warehouse”
val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
import spark.implicits._
import spark.sql
sql("SELECT * FROM sparktest.student").show()
- 编写程序向Hive数据库的sparktest.student表中插入两条数据
// 准备两条数据
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
// 设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
// 下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim,p(3).toInt))
// 建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
// 查看studentDF
studentDF.show()
// 注册临时表
studentDF.registerTempTable("tempTable")
// 插入
sql("insert into sparktest.student select * from tempTable")
更多推荐
《大数据处理技术Spark》
发布评论