学习笔记(6)"/>
大数据高级开发工程师——Spark学习笔记(6)
文章目录
- Spark内存计算框架
- Spark SQL
- SparkSQL概述
- 1. SparkSQL的前世今生
- 2. 什么是 SparkSQL
- SparkSQL的四大特性
- 1. 易整合
- 2. 统一的数据源访问
- 3. 兼容Hive
- 4. 支持标准的数据库连接
- DataFrame概述
- 1. DataFrame发展
- 2. DataFrame是什么
- 3. DataFrame和RDD的优缺点
- RDD
- DataFrame
- 初识DataFrame
- 1. 读取文件文件
- 2. 读取json文件
- 3. 读取parquet文件
- 4. 通过StructType动态指定Schema
- DataFrame常用操作
- 1. DSL风格语法
- 2. SQL风格语法(常用)
- DataSet概述
- 1. DataSet是什么
- 2. 如何构建DataSet
- 3. RDD、DataFrame、DataSet对比
- 读取外部数据源
- 1. SparkSQL读取MySQL数据
- 2. Spark操作CSV文件并将结果写入MySQL
- 3. Spark与Hive整合
- spark整合hive——通过SparkSql-shell
- spark的thrift server与hive进行远程交互
- 4. 读写Hive数据
- 5. 读写HBase数据
- SparkSQL自定义函数
- 1. 自定义UDF函数:一对一
- 2. 自定义UDAF函数:多对一
- 3. 自定义UDTF函数:一对多
Spark内存计算框架
Spark SQL
SparkSQL概述
1. SparkSQL的前世今生
- Shark 是专门针对于 spark 的,构建大规模数据仓库系统的一个框架。
- Shark 依赖 hive、与 Hive 兼容、同时也依赖于 Spark 版本。
- HiveSql底层把 sql 解析成了 mapreduce 程序,Shark 是把 sql 语句解析成了 Spark 任务。
- 随着性能优化的上限,以及集成 SQL 的一些复杂的分析功能,发现 Hive 的 MapReduce 思想限制了 Shark 的发展。
- 最后 Databricks 公司终止对 Shark 的开发,决定单独开发一个框架,不再依赖 hive,把重点转移到了 sparksql 这个框架上。
2. 什么是 SparkSQL
- 官方文档:/
Spark SQL is Apache Spark’s module for working with structured data.
- SparkSQL是 Apache Spark 用来处理结构化数据的一个模块。
SparkSQL的四大特性
1. 易整合
- 将 SQL 查询与Spark 程序无缝混合
- 即对结构化数据进行查询,可以使用 sql 分析;
- 也可以使用 DataFrame、DataSet api;
- 可以使用不同的语言进行代码开发java、scala、python、R
2. 统一的数据源访问
- 以相同的方式(相同风格的API)连接到任何数据源
// sparksql 可以采用一种统一的方式去对接任意的外部数据源
val dataFrame = sparkSession.read.文件格式的方法名("该文件格式的路径")
3. 兼容Hive
- Spark 支持 SQL 以及 HiveQL 语法;
- 支持 Hive SerDes;
- 支持 UDF;
- 可以接入已存在的 Hive 数仓;
- Spark SQL 使用 Hive 的 metastore 服务。
4. 支持标准的数据库连接
- Spark SQL 支持标准的数据库连接JDBC或者ODBC。
DataFrame概述
- Spark Core:操作 RDD ==>> 封装了数据 ==>> 对应的操作入口类 SparkContext。
- Spark SQL:编程抽象 DataFrame ==>> 对应的操作入口类 SparkSession。
- 从 Spark 2.0 开始,SparkSession 是 Spark 新的查询起始点,其内部封装了 SparkContext,所以计算的本质还是由 SparkContext 完成。
1. DataFrame发展
- DataFrame 的前身是 schemaRDD,这个 schemaRDD 是直接继承自 RDD,它是 RDD 的一个实现类。
- 在 Spark 1.3.0 之后把 schemaRDD 改名为 DataFrame
- 它不再继承自 RDD;
- 而是自己实现 RDD 上的一些功能。
- 也可以把 DataFrame 转换成一个 RDD:通过调用 DataFrame 的一个方法
val rdd = dataFrame.rdd
2. DataFrame是什么
- 在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格。
- DataFrame 带有数据的结构信息:
- Schema元信息;
- DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。
- DataFrame 可以从很对数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表等。
- RDD 可以把它内部元素看成是一个java对象,DataFrame 可以把内部看成是一个个Row对象,它表示一行一行的数据。
- 总结:
- 可以把 DataFrame 这样去理解:RDD + schema 元信息
- dataFrame相比于rdd来说,多了对数据的描述信息(schema元信息)。
3. DataFrame和RDD的优缺点
RDD
- 优点:
- 编译时类型安全:开发会进行类型检查,在编译的时候及时发现错误
- 具有面向对象编程的风格
- 缺点:
- font color=red>构建大量的java对象占用了大量heap堆空间,导致频繁的GC
- 由于数据集RDD它的数据量比较大,后期都需要存储在heap堆中,这里有heap堆中的内存空间有限,出现频繁的垃圾回收(GC)
- 程序在进行垃圾回收的过程中,所有的任务都是暂停(STW stop the world),影响程序执行的效率
- 数据的序列化和反序列性能开销很大
- 在分布式程序中,对象(对象的内容和结构)是先进行序列化,发送到其他服务器,进行大量的网络传输
- 然后接受到这些序列化的数据之后,再进行反序列化来恢复该对象
- font color=red>构建大量的java对象占用了大量heap堆空间,导致频繁的GC
DataFrame
- 优点:
- DataFrame引入了schema元信息和off-heap(堆外内存)
- DataFrame引入off-heap
- 大量的对象构建直接使用操作系统层面上的内存,不在使用heap堆中的内存
- 这样一来heap堆中的内存空间就比较充足,不会导致频繁GC,程序的运行效率比较高
- 它是解决了RDD构建大量的java对象占用了大量heap堆空间,避免导致频繁的GC这个缺点。
- DataFrame引入了schema元信息:就是数据结构的描述信息
- spark程序中的大量对象在进行网络传输的时候,只需要把数据的内容本身进行序列化就可以,数据结构信息可以省略掉
- 这样一来数据网络传输的数据量是有所减少
- 数据的序列化和反序列性能开销就不是很大了
- 它是解决了RDD数据的序列化和反序列性能开销很大这个缺点
- 缺点:DataFrame引入了schema元信息和off-heap(堆外)它是分别解决了RDD的缺点,同时它也丢失了RDD的优点
- 编译时类型不安全:编译时不会进行类型的检查,这里也就意味着前期是无法在编译的时候发现错误,只有在运行的时候才会发现
- 不在具有面向对象编程的风格:类似二维表
初识DataFrame
1. 读取文件文件
- resources目录下创建文件 person.txt,内容如下
1 youyou 38
2 Tony 25
3 laowang 18
4 dali 30
- 代码实现:
object Case01_ReadText {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()val df: DataFrame = spark.read.text(this.getClass.getClassLoader.getResource("person.txt").getPath)/*** 打印schema信息* root* |-- value: string (nullable = true)*/df.printSchemaprintln("----------------")println(df.count()) // 4/*** +------------+* | value|* +------------+* | 1 youyou 38|* | 2 Tony 25|* |3 laowang 18|* | 4 dali 30|* +------------+*/println("----------------")df.show()ss.stop()}
}
- 改造代码,输出成对象形式的二维表格
case class Person(id: String, name: String, age: Int)object Case02_ReadTextV2 {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")// 添加隐式转换import spark.implicits._val rdd1: RDD[Array[String]] = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 将 rdd 与样例类进行关联val personRDD: RDD[Person] = rdd1.map(x => Person(x(0), x(1), x(2).toInt))// 将 rdd 转成 DataFrameval df = personRDD.toDF/*** root* |-- id: string (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = false)*/df.printSchema()/*** +---+-------+---+* | id| name|age|* +---+-------+---+* | 1| youyou| 38|* | 2| Tony| 25|* | 3|laowang| 18|* | 4| dali| 30|* +---+-------+---+*/df.show()spark.stop()}
}
2. 读取json文件
- 在 resources 目录新建 person.json 文件,内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
- 代码实现
object Case03_ReadJson {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("person.json").getPath)/*** root* |-- age: long (nullable = true)* |-- name: string (nullable = true)*/df.printSchemaprintln("--------------")/*** +----+-------+* | age| name|* +----+-------+* |null|Michael|* | 30| Andy|* | 19| Justin|* +----+-------+*/df.show()spark.stop()}
}
3. 读取parquet文件
- Spark 自带样例文件
spark-2.3.3-bin-hadoop2.7/examples/src/main/resources/users.parquet
复制到自己的工程的 resources 目录 - 代码实现:
object Case04_ReadParquet {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.parquet(this.getClass.getClassLoader.getResource("users.parquet").getPath)/*** root* |-- name: string (nullable = true)* |-- favorite_color: string (nullable = true)* |-- favorite_numbers: array (nullable = true)* | |-- element: integer (containsNull = true)*/df.printSchema/*** +------+--------------+----------------+* | name|favorite_color|favorite_numbers|* +------+--------------+----------------+* |Alyssa| null| [3, 9, 15, 20]|* | Ben| red| []|* +------+--------------+----------------+*/df.showspark.stop()}
}
4. 通过StructType动态指定Schema
- 应用场景:在开发代码之前,无法确定需要的 DataFrame 对应的 Schema 元信息,这时需要在开发代码的过程中指定。
- 代码实现:
object Case05_StructTypeSchema {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val rdd: RDD[Array[String]] = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 将rdd与Row对象关联val rowRDD: RDD[Row] = rdd.map(x => Row(x(0), x(1), x(2).toInt))// 指定dataFrame的schema信息,这里指定的字段个数和类型必须要跟Row对象保持一致val schema = StructType(StructField("id", StringType) ::StructField("name", StringType) ::StructField("age", IntegerType) :: Nil)// 利用rdd生成DataFrameval df: DataFrame = spark.createDataFrame(rowRDD, schema)/*** root* |-- id: string (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = true)*/df.printSchema/*** +---+-------+---+* | id| name|age|* +---+-------+---+* | 1| youyou| 38|* | 2| Tony| 25|* | 3|laowang| 18|* | 4| dali| 30|* +---+-------+---+*/df.show()// 用sql的方式查询结构化数据df.createTempView("person")/*** +---+-------+---+* | id| name|age|* +---+-------+---+* | 1| youyou| 38|* | 2| Tony| 25|* | 3|laowang| 18|* | 4| dali| 30|* +---+-------+---+*/spark.sql("select * from person").show()spark.stop()}
}
DataFrame常用操作
1. DSL风格语法
- 就是 sparksql 中的 DataFrame 自身提供了一套自己的 Api,可以去使用这套 api 来做相应的处理
- 在 Case02_ReadTextV2.scala 中:
// 将 rdd 转成 DataFrameval personDF = personRDD.toDFpersonDF.printSchema()personDF.show()/************************** DSL风格语法 start *************************/// 1. 查询指定字段personDF.select("name").showpersonDF.select($"name").show// 2. 实现 age+1personDF.select($"name", $"age", $"age" + 1).show// 3. 实现 age>30 过滤personDF.filter($"age" > 30).show// 4. 按照 age 分组统计personDF.groupBy("age").count.show// 5. 按照age分组统计次数降序personDF.groupBy("age").count().sort($"age".desc).show/************************** DSL风格语法 end *************************/
2. SQL风格语法(常用)
- 可以把 DataFrame 注册成一张表,然后通过 sparkSession.sql(sql语句) 操作
/************************** SQL风格语法 start *************************/// 1. DataFrame注册成表personDF.createTempView("person")// 2. 使用SparkSession调用sql方法统计查询spark.sql("select * from person").showspark.sql("select name from person").showspark.sql("select name, age from person").showspark.sql("select * from person where age > 30").showspark.sql("select count(*) from person where age > 30").showspark.sql("select age, count(*) from person group by age").showspark.sql("select age, count(*) as count from person group by age").showspark.sql("select * from person order by age desc").show/************************** SQL风格语法 end *************************/
DataSet概述
1. DataSet是什么
- DataSet 是分布式的数据集合,Dataset 提供了强类型支持,也是在 RDD 的每行数据加了类型约束。
- DataSet 是 DataFrame 的一个扩展,是 SparkSQL1.6 后新增的数据抽象,API 友好
- 它集中了 RDD 的优点(强类型和可以用强大lambda函数)
- 以及使用了 Spark SQL 优化的执行引擎。
- DataFrame 是 DataSet 的特例,
type DataFrame=DataSet[Row]
- 可以通过 as 方法将 DataFrame 转换成 DataSet
- Row 是一个类型,可以是 Person、Animal,所有的表结构信息都用 Row 来表示
- 优点:
- DataSet 可以在编译时检查类型
- 并且是面向对象的编程接口
2. 如何构建DataSet
- 方式一:通过 sparkSession 调用 createDataset 方法
val ds = spark.createDataset(1 to 10) // scala 集合
ds.showval ds = spark.createDataset(sc.textFile("/person.txt")) //rdd
ds.show
- 方式二:使用 scala 集合和 rdd 调用 toDS 方法
sc.textFile("/person.txt").toDS
List(1,2,3,4,5).toDS
- 方式三:把一个 DataFrame 转换成 DataSet
val ds = dataFrame.as[强类型]
- 方式四:通过一个 DataSet 转换生成一个新的 DataSet
List(1,2,3,4,5).toDS.map(x => x * 10)
3. RDD、DataFrame、DataSet对比
关系是怎样的?
- 首先,Spark RDD、DataFrame 和 DataSet 是 Spark 的三类 API,他们的发展过程:
- DataFrame 是 spark1.3.0 版本提出来的,spark1.6.0 版本又引入了 DateSet;
- 但是在 spark2.0 版本中,DataFrame 和 DataSet 合并为DataSet。
- 那么你可能会问了:那么,在2.0以后的版本里,RDD是不是不需要了呢?
- 答案是:NO!
- 首先,DataFrame 和 DataSet 都是基于 RDD 的,而且这三者之间可以通过简单的API调用进行无缝切换。
数据有什么区别?
三者 API 特点
- RDD:
- 优点:相比于传统的 MapReduce 框架,Spark 在 RDD 中内置很多函数操作,group、map、filter等,方便处理结构化或非结构化数据。面向对象编程,直接存储的 java 对象,类型转化也安全。
- 缺点:由于它基本和 hadoop 一样万能的,因此没有针对特殊场景的优化,比如对于结构化数据处理相对于 sql 来比非常麻烦。默认采用的是 java 序列号方式,序列化结果比较大,而且数据存储在 java 堆内存中,导致 gc 比较频繁。
- DataFrame:
- 优点:结构化数据处理非常方便,支持Avro、CSV、ElasticSearch 和 Cassandra 等 kv 数据,也支持 HIVE tables、MySQL 等传统数据表。有针对性的优化:采用 Kryo 序列化;由于数据结构元信息 spark 已经保存,序列化时不需要带上元信息,大大的减少了序列化大小;而且数据保存在堆外内存中,减少了 gc 次数,所以运行更快。hive兼容,支持hql、udf等。
- 缺点:编译时不能类型转化安全检查,运行时才能确定是否有问题。对于对象支持不友好,rdd 内部数据直接以 java 对象存储,dataframe 内存存储的是 row 对象而不能是自定义对象。
- DataSet:
- 优点:DateSet 整合了 RDD 和 DataFrame 的优点,支持结构化和非结构化数据。和 RDD 一样,支持自定义对象存储。和 ataFrame 一样,支持结构化数据的 sql 查询。采用了堆外内存存储,gc 友好。类型转化安全,代码友好。
三者如何相互转换
- 涉及到RDD,DataFrame,DataSet之间操作时,需要隐式转换导入:import spark.implicits._
- 这里的 spark 不是包名,而是代表了 SparkSession 的那个对象名,所以必须先创建 SparkSession 对象再导入
case class Person(id: String, name: String, age: Int)object Case06_SparkConversion {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val sc = spark.sparkContextsc.setLogLevel("WARN")// 隐式转换import spark.implicits._val rdd = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 把rdd与样例类进行关联val personRDD = rdd.map(x => Person(x(0), x(1), x(2).toInt))// 1. rdd -> dfval df1 = personRDD.toDFdf1.show// 2. rdd -> dsval ds1 = personRDD.toDSds1.show// 3. df -> rddval rdd1 = df1.rddprintln(rdd1.collect.toList)// 4. ds -> rddval rdd2 = ds1.rddprintln(rdd2.collect.toList)// 5. ds -> dfval df2: DataFrame = ds1.toDFdf2.show// df -> dsval ds2: Dataset[Person] = df2.as[Person]ds2.showspark.stop()}
}
读取外部数据源
1. SparkSQL读取MySQL数据
- Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一些列的计算后,还可以将数据再写回关系型数据库中。
- 代码示例:
/*** 使用 SparkSQL读写MySQL表中的数据*/
object Case07_ReadMySQL {def main(args: Array[String]): Unit = {// 1. 创建 SparkConf 对象val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")// 2. 创建 SparkSession 对象val spark = SparkSession.builder().config(conf).getOrCreate()// 3. 创建 DataFrameval url = "jdbc:mysql://192.168.254.132:3306/mydb?characterEncoding=UTF-8"val tableName = "jobdetail"val props = new Properties()props.setProperty("user", "root")props.setProperty("password", "123456")val mysqlDF: DataFrame = spark.read.jdbc(url, tableName, props)// 4. 读取 MySQL 表中的数据// 4.1 打印schema信息mysqlDF.printSchema()// 4.2 展示数据mysqlDF.show()// 4.3 将dataFrame注册成表mysqlDF.createTempView("job_detail")spark.sql("select * from job_detail where city = '广东'").show()spark.stop()}
}
2. Spark操作CSV文件并将结果写入MySQL
object Case08_ReadCsvWriteMySQL {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") // 时间转换.option("header", "true") // 第一行数据都是head(字段属性的意思)
// .option("multiLine", "true") // 数据可能换行.load(this.getClass.getClassLoader.getResource("data").getPath)df.createOrReplaceTempView("job_detail")spark.sql("select job_name,job_url,job_location,job_salary,job_company,job_experience,job_class,job_given,job_detail,company_type,company_person,search_key,city from job_detail where job_company = '北京无极慧通科技有限公司'").show(80)val props = new Properties()props.put("user", "root")props.put("password", "123456")df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.254.132:3306/mydb?useSSL=false&useUnicode=true&characterEncoding=UTF-8","mydb.jobdetail_copy", props)}
}
3. Spark与Hive整合
- Spark on hive:Spark 通过 Spark SQL 使用 Hive 的语句操作 Hive,底层运行的还是 spark rdd:
- 通过 spark sql,加载 hive 的配置文件,获取到 hive 的元数据信息;
- spark sql 获取到 hive 的元数据信息之后就可以拿到 hive 的所有表的数据;
- 接下来就可以通过 spark sql 来操作 hive 表中的数据。
- Hive on spark:将 Hive 查询,从 MapReduce的 MR(Hadoop计算引擎) 操作替换为 spark rdd(spark执行引擎)操作。相对于 spark on hive,这个实现起来则麻烦很多,必须重新编译你的 spark 和导入 jar 包,不过目前大部分使用的是 spark on hive。
spark整合hive——通过SparkSql-shell
- 拷贝 hive-site.xml 配置文件:将 node03 服务器安装的 hive 目录下 conf 文件夹下面的 hive-site.xml 拷贝到 spark 安装的各个机器节点,node03 执行以下命令进行拷贝
$ pwd
/bigdata/install/hive-3.1.2/conf
$ scp hive-site.xml node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
- 拷贝 mysql 驱动包:将 hive 当中 mysql 的连接驱动包拷贝到 spark 安装家目录下的 jars 目录下,node03 执行下命令拷贝 mysql 的 lib 驱动包
$ ll mysql-connector-java-5.1.38.jar
-rw-rw-r--. 1 hadoop hadoop 983911 12月 6 2021 mysql-connector-java-5.1.38.jar
$ pwd
/bigdata/install/hive-3.1.2/lib
$ scp mysql-connector-java-5.1.38.jar node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
$ scp mysql-connector-java-5.1.38.jar node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
$ scp mysql-connector-java-5.1.38.jar node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
- 进入 spark-sql 直接操作 hive 数据库当中的数据:
- 在 spark2.0 版本后,由于出现了 sparkSession,在初始化 sqlContext 时,会设置默认的
spark.sql.warehouse.dir=spark-warehouse
,此时将 hive 与 spark sql 整合完成后,在通过 spark-sql 脚本启动时,会在当前目录下创建一个 spark.sql.warehouse.dir 为 spark-warehouse 的目录,存放由 spark-sql 创建数据库和创建表的数据信息,与之前 hive 的数据信息不是放在同一个路径下(可以互相访问)。但是此时 spark-sql 中表的数据在本地,不利于操作,也不安全。 - 所有在启动的时候需要加上下面这样一个参数,以保证 spark-sql 启动时不再产生新的存放数据的目录,sparksql 与 hive 最终使用的是 hive 统一存放数据的目录。
- 在 spark2.0 版本后,由于出现了 sparkSession,在初始化 sqlContext 时,会设置默认的
--conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse
- 在 hive 中创建表并插入数据:
CREATE EXTERNAL TABLE `student`(`ID` bigint COMMENT '',`CreatedBy` string COMMENT '创建人',`CreatedTime` string COMMENT '创建时间',`UpdatedBy` string COMMENT '更新人',`UpdatedTime` string COMMENT '更新时间',`Version` int COMMENT '版本号',`name` string COMMENT '姓名'
) COMMENT '学生表'
PARTITIONED BY (`dt` String COMMENT 'partition')
row format delimited fields terminated by '\t'
location '/student';INSERT INTO TABLE student partition(dt='2022-07-12') VALUES(1, "xxx", "2022-07-12", "", "", 1, "zhangsan");
INSERT INTO TABLE student partition(dt='2022-07-12') VALUES(2, "xxx", "2022-07-12", "", "", 2, "lisi");
- 通过 shell 方式:node01 直接执行以下命令,进入 spark-sql 交互界面,然后操作 hive 当中的数据
$ spark-sql --master local[2] \
--executor-memory 512m --total-executor-cores 3 \
--conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse# 执行查询
select * from student;
- 通过脚本方式:编写如下脚本并执行
#!/bin/sh
# 定义 spark sql 提交脚本的头信息
SUBMIT_INFO="spark-sql --master spark://node01:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse"
# 定义一个 sql 语句
SQL="select * from student;"
# 执行 sql 语句,类似于 hive -e sql
echo "$SUBMIT_INFO"
echo "$SQL"
$SUBMIT_INFO -e "$SQL"
- 执行:
$ sh spark_on_hive.sh
spark的thrift server与hive进行远程交互
- 除了可以通过 spark-shell 来与 hive 进行整合之外,我们也可以通过 spark 的 thrift 服务来远程与 hive 进行交互。
- node03 执行以下命令修改 hive-site.xml 的配置属性,添加以下几个配置
<property><name>hive.metastore.uris</name><value>thrift://node03:9083</value><description>Thrift URI for the remote metastore</description>
</property>
<property><name>hive.server2.thrift.min.worker.threads</name><value>5</value>
</property>
<property><name>hive.server2.thrift.max.worker.threads</name><value>500</value>
</property>
- 修改完的配置文件后,分发到其他机器:
$ pwd
/bigdata/install/hive-3.1.2/conf
$ scp hive-site.xml node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
- node03 启动 metastore 服务
hive --service metastore
- node03 执行以下命令启动 spark 的 thrift server:hive 安装在哪一台,就在哪一台服务器启动spark 的 thrift server
$ pwd
/bigdata/install/spark-2.3.3-bin-hadoop2.7/sbin
$ ./start-thriftserver.sh --master local[*] --executor-memory 2g --total-executor-cores 5
- 直接使用 beeline 来连接:直接在 node03 服务器上面使用 beeline 来进行连接 spark-sql
$ beeline --color=true
beeline> !connect jdbc:hive2://node03:10000
Connecting to jdbc:hive2://node03:10000
Enter username for jdbc:hive2://node03:10000: hadoop
Enter password for jdbc:hive2://node03:10000: ******
4. 读写Hive数据
- 添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.3</version>
</dependency>
- 将服务端配置 hive-site.xml,放入到 idea 的 resources 目录下
- 代码实现:
object Case09_SparkSQLOnHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").enableHiveSupport() // 启用hive.config("spark.sql.warehouse.dir", "hdfs://node01:8020/user/hive/warehouse").getOrCreate()val df: DataFrame = spark.sql("select * from student")df.show()// 直接写表达式,通过 insert into 插入df.write.saveAsTable("student1")spark.sql("insert into student1 select * from student")}
}
5. 读写HBase数据
- 需要添加依赖:
<dependency><groupId>org.json4s</groupId><artifactId>json4s-jackson_2.11</artifactId><version>3.3.0</version>
</dependency>
- 创建 HBase 表,并插入数据:
create 'spark_hbase','info'
put 'spark_hbase','0001','info:name','tangseng'
put 'spark_hbase','0001','info:age','30'
put 'spark_hbase','0001','info:sex','0'
put 'spark_hbase','0001','info:addr','beijing'
put 'spark_hbase','0002','info:name','sunwukong'
put 'spark_hbase','0002','info:age','508'
put 'spark_hbase','0002','info:sex','0'
put 'spark_hbase','0002','info:addr','shanghai'
put 'spark_hbase','0003','info:name','zhubajie'
put 'spark_hbase','0003','info:age','715'
put 'spark_hbase','0003','info:sex','0'
put 'spark_hbase','0003','info:addr','shenzhen'
put 'spark_hbase','0004','info:name','bailongma'
put 'spark_hbase','0004','info:age','1256'
put 'spark_hbase','0004','info:sex','0'
put 'spark_hbase','0004','info:addr','donghai'
put 'spark_hbase','0005','info:name','shaheshang'
put 'spark_hbase','0005','info:age','1008'
put 'spark_hbase','0005','info:sex','0'
put 'spark_hbase','0005','info:addr','tiangong'create "spark_hbase_copy",'info'
- 代码实现:
object Case10_SparkSQLOnHBase {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()spark.sparkContext.setLogLevel("WARN")import spark.implicits._val hconf: Configuration = HBaseConfiguration.createhconf.set(HConstants.ZOOKEEPER_QUORUM, "node01:2181,node02:2181,node03:2181")val hbaseContext = new HBaseContext(spark.sparkContext, hconf)// 定义映射的 catalogval catalog: String = "{\"table\":{\"namespace\":\"default\",\"name\":\"spark_hbase\"},\"rowkey\":\"key\",\"columns\":{\"f0\":{\"cf\":\"rowkey\",\"col\":\"key\",\"type\":\"string\"},\"f1\":{\"cf\":\"info\",\"col\":\"addr\",\"type\":\"string\"},\"f2\":{\"cf\":\"info\",\"col\":\"age\",\"type\":\"boolean\"},\"f3\":{\"cf\":\"info\",\"col\":\"name\",\"type\":\"string\"}}}";// 读取HBase数据val ds: DataFrame = spark.read.format("org.apache.hadoop.hbase.spark").option(HBaseTableCatalog.tableCatalog, catalog).load()ds.show(10)val catalogCopy: String = catalog.replace("spark_hbase", "spark_hbase_out")// 数据写入HBaseds.write.format("org.apache.hadoop.hbase.spark").option(HBaseTableCatalog.tableCatalog, catalogCopy).mode(SaveMode.Overwrite).save()}
}
SparkSQL自定义函数
- 用户自定义函数类别分为以下三种:
- ① UDF:输入一行,返回一个结果(一对一)
- ② UDAF:输入多行,返回一行,这里的是 aggregate,聚合的意思,如果业务复杂,需要自己实现聚合函数
- ③ UDTF:输入一行,返回多行(一对多),在 SparkSQL 中没有,因为 Spark 中使用 flatMap 即可实现这个功能
1. 自定义UDF函数:一对一
- 需求:读取深圳二手房成交数据,对房子的年份进行自定义函数处理,如果没有年份,那么就给默认值1990。
object Case11_SparkUDF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")// 注册UDFspark.udf.register("house_udf", new UDF1[String, String] {val pattern: Pattern = Pattern.compile("^[0-9]*$")override def call(input: String): String = {val matcher = pattern.matcher(input)if (matcher.matches()) inputelse "1990"}}, DataTypes.StringType)// 使用UDFspark.sql("select house_udf(house_age) from house_sale limit 200").show()spark.stop()}
}
2. 自定义UDAF函数:多对一
- 需求:自定义UDAF函数,读取深圳二手房数据,然后按照楼层进行分组,求取每个楼层的平均成交金额
object Case12_SparkUDAF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")spark.sql("select floor from house_sale limit 30").show()spark.udf.register("udaf", new MyAverage)spark.sql("select floor, udaf(house_sale_money) from house_sale group by floor").show()df.printSchema()spark.stop()}
}
class MyAverage extends UserDefinedAggregateFunction {// 聚合函数输入函数的数据类型override def inputSchema: StructType = StructType(StructField("floor", DoubleType) :: Nil)// 聚合缓冲区中值的数据类型override def bufferSchema: StructType = {StructType(StructField("sum", DoubleType) :: StructField("count", LongType) :: Nil)}// 返回值类型override def dataType: DataType = DoubleType// 对于相同输入是否一直返回相同的输出override def deterministic: Boolean = true// 初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {// 用于存储不同类型的楼房的总成交额buffer(0) = 0D// 用于存储不同类型的楼房的总个数buffer(1) = 0L}// 相同Execute间的数据合并(分区内聚合)override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (!input.isNullAt(0)) {buffer(0) = buffer.getDouble(0) + input.getDouble(0)buffer(1) = buffer.getLong(1) + 1}}// 不同Execute间的数据合并(分区外聚合)override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// 计算最终结果override def evaluate(buffer: Row): Any = buffer.getDouble(0) / buffer.getLong(1)
}
3. 自定义UDTF函数:一对多
- 需求:自定义UDTF函数,读取深圳二手房数据,然后将 part_place(部分地区)以空格切分进行展示
object Case13_SparkUDTF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")// 注册UDTF算子,这里无法使用sparkSession.udf.register(),注意包全路径spark.sql("CREATE TEMPORARY FUNCTION MySplit as 'com.yw.spark.example.sql.cases.MySplit'")spark.sql("select part_place, MySplit(part_place, ' ') from house_sale limit 50").show()spark.stop()}
}class MySplit extends GenericUDTF {override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {if (args.length != 2) {throw new UDFArgumentLengthException("UserDefinedUDTF takes only two argument")}// 判断第一个参数是不是字符串参数if (args(0).getCategory() != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentException("UserDefinedUDTF takes string as a parameter")}// 列名:会被用户传递的覆盖val fieldNames: ArrayList[String] = new ArrayList[String]()fieldNames.add("col1")// 返回列以什么格式输出,这里是string,添加几个就是几个列,和上面的名字个数对应个数val fieldOIs: ArrayList[ObjectInspector] = new ArrayList[ObjectInspector]()fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs)}override def process(objects: Array[AnyRef]): Unit = {// 获取数据val data: String = objects(0).toString// 获取分隔符val splitKey: String = objects(1).toString// 切分数据val words: Array[String] = data.split(splitKey)// 遍历写出words.foreach(x => {// 将数据放入集合val tmp: Array[String] = new Array[String](1)tmp(0) = xforward(tmp)})}override def close(): Unit = {// 没有流操作}
}
- 相关代码 github 地址:
更多推荐
大数据高级开发工程师——Spark学习笔记(6)
发布评论