大数据高级开发工程师——Spark学习笔记(6)

编程入门 行业动态 更新时间:2024-10-27 06:19:10

大数据高级开发工程师——Spark<a href=https://www.elefans.com/category/jswz/34/1770117.html style=学习笔记(6)"/>

大数据高级开发工程师——Spark学习笔记(6)

文章目录

  • Spark内存计算框架
    • Spark SQL
      • SparkSQL概述
        • 1. SparkSQL的前世今生
        • 2. 什么是 SparkSQL
      • SparkSQL的四大特性
        • 1. 易整合
        • 2. 统一的数据源访问
        • 3. 兼容Hive
        • 4. 支持标准的数据库连接
      • DataFrame概述
        • 1. DataFrame发展
        • 2. DataFrame是什么
        • 3. DataFrame和RDD的优缺点
          • RDD
          • DataFrame
      • 初识DataFrame
        • 1. 读取文件文件
        • 2. 读取json文件
        • 3. 读取parquet文件
        • 4. 通过StructType动态指定Schema
      • DataFrame常用操作
        • 1. DSL风格语法
        • 2. SQL风格语法(常用)
      • DataSet概述
        • 1. DataSet是什么
        • 2. 如何构建DataSet
        • 3. RDD、DataFrame、DataSet对比
      • 读取外部数据源
        • 1. SparkSQL读取MySQL数据
        • 2. Spark操作CSV文件并将结果写入MySQL
        • 3. Spark与Hive整合
          • spark整合hive——通过SparkSql-shell
          • spark的thrift server与hive进行远程交互
        • 4. 读写Hive数据
        • 5. 读写HBase数据
      • SparkSQL自定义函数
        • 1. 自定义UDF函数:一对一
        • 2. 自定义UDAF函数:多对一
        • 3. 自定义UDTF函数:一对多

Spark内存计算框架

Spark SQL

SparkSQL概述

1. SparkSQL的前世今生

  • Shark 是专门针对于 spark 的,构建大规模数据仓库系统的一个框架。
  • Shark 依赖 hive、与 Hive 兼容、同时也依赖于 Spark 版本。
  • HiveSql底层把 sql 解析成了 mapreduce 程序,Shark 是把 sql 语句解析成了 Spark 任务。
  • 随着性能优化的上限,以及集成 SQL 的一些复杂的分析功能,发现 Hive 的 MapReduce 思想限制了 Shark 的发展。
  • 最后 Databricks 公司终止对 Shark 的开发,决定单独开发一个框架,不再依赖 hive,把重点转移到了 sparksql 这个框架上。

2. 什么是 SparkSQL

  • 官方文档:/

Spark SQL is Apache Spark’s module for working with structured data.

  • SparkSQL是 Apache Spark 用来处理结构化数据的一个模块。

SparkSQL的四大特性

1. 易整合

  • 将 SQL 查询与Spark 程序无缝混合
    • 即对结构化数据进行查询,可以使用 sql 分析;
    • 也可以使用 DataFrame、DataSet api;
    • 可以使用不同的语言进行代码开发java、scala、python、R

2. 统一的数据源访问

  • 以相同的方式(相同风格的API)连接到任何数据源
// sparksql 可以采用一种统一的方式去对接任意的外部数据源
val dataFrame = sparkSession.read.文件格式的方法名("该文件格式的路径")

3. 兼容Hive

  • Spark 支持 SQL 以及 HiveQL 语法;
  • 支持 Hive SerDes;
  • 支持 UDF;
  • 可以接入已存在的 Hive 数仓;
  • Spark SQL 使用 Hive 的 metastore 服务。

4. 支持标准的数据库连接

  • Spark SQL 支持标准的数据库连接JDBC或者ODBC。

DataFrame概述

  • Spark Core:操作 RDD ==>> 封装了数据 ==>> 对应的操作入口类 SparkContext。
  • Spark SQL:编程抽象 DataFrame ==>> 对应的操作入口类 SparkSession。
  • 从 Spark 2.0 开始,SparkSession 是 Spark 新的查询起始点,其内部封装了 SparkContext,所以计算的本质还是由 SparkContext 完成。

1. DataFrame发展

  • DataFrame 的前身是 schemaRDD,这个 schemaRDD 是直接继承自 RDD,它是 RDD 的一个实现类。
  • 在 Spark 1.3.0 之后把 schemaRDD 改名为 DataFrame
    • 它不再继承自 RDD;
    • 而是自己实现 RDD 上的一些功能。
  • 也可以把 DataFrame 转换成一个 RDD:通过调用 DataFrame 的一个方法 val rdd = dataFrame.rdd

2. DataFrame是什么

  • 在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格
  • DataFrame 带有数据的结构信息:
    • Schema元信息
    • DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。
  • DataFrame 可以从很对数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表等。
  • RDD 可以把它内部元素看成是一个java对象,DataFrame 可以把内部看成是一个个Row对象,它表示一行一行的数据。

  • 总结:
    • 可以把 DataFrame 这样去理解:RDD + schema 元信息
    • dataFrame相比于rdd来说,多了对数据的描述信息(schema元信息)。

3. DataFrame和RDD的优缺点

RDD
  • 优点:
    • 编译时类型安全:开发会进行类型检查,在编译的时候及时发现错误
    • 具有面向对象编程的风格
  • 缺点:
    • font color=red>构建大量的java对象占用了大量heap堆空间,导致频繁的GC
      • 由于数据集RDD它的数据量比较大,后期都需要存储在heap堆中,这里有heap堆中的内存空间有限,出现频繁的垃圾回收(GC)
      • 程序在进行垃圾回收的过程中,所有的任务都是暂停(STW stop the world),影响程序执行的效率
    • 数据的序列化和反序列性能开销很大
      • 在分布式程序中,对象(对象的内容和结构)是先进行序列化,发送到其他服务器,进行大量的网络传输
      • 然后接受到这些序列化的数据之后,再进行反序列化来恢复该对象
DataFrame
  • 优点:
    • DataFrame引入了schema元信息off-heap(堆外内存)
    • DataFrame引入off-heap
      • 大量的对象构建直接使用操作系统层面上的内存,不在使用heap堆中的内存
      • 这样一来heap堆中的内存空间就比较充足,不会导致频繁GC,程序的运行效率比较高
      • 它是解决了RDD构建大量的java对象占用了大量heap堆空间,避免导致频繁的GC这个缺点
    • DataFrame引入了schema元信息:就是数据结构的描述信息
      • spark程序中的大量对象在进行网络传输的时候,只需要把数据的内容本身进行序列化就可以,数据结构信息可以省略
      • 这样一来数据网络传输的数据量是有所减少
      • 数据的序列化和反序列性能开销就不是很大了
      • 它是解决了RDD数据的序列化和反序列性能开销很大这个缺点
  • 缺点:DataFrame引入了schema元信息和off-heap(堆外)它是分别解决了RDD的缺点,同时它也丢失了RDD的优点
    • 编译时类型不安全:编译时不会进行类型的检查,这里也就意味着前期是无法在编译的时候发现错误,只有在运行的时候才会发现
    • 不在具有面向对象编程的风格:类似二维表

初识DataFrame

1. 读取文件文件

  • resources目录下创建文件 person.txt,内容如下
1 youyou 38
2 Tony 25
3 laowang 18
4 dali 30
  • 代码实现:
object Case01_ReadText {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()val df: DataFrame = spark.read.text(this.getClass.getClassLoader.getResource("person.txt").getPath)/*** 打印schema信息* root* |-- value: string (nullable = true)*/df.printSchemaprintln("----------------")println(df.count()) // 4/*** +------------+* |       value|* +------------+* | 1 youyou 38|* |   2 Tony 25|* |3 laowang 18|* |   4 dali 30|* +------------+*/println("----------------")df.show()ss.stop()}
}
  • 改造代码,输出成对象形式的二维表格
case class Person(id: String, name: String, age: Int)object Case02_ReadTextV2 {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")// 添加隐式转换import spark.implicits._val rdd1: RDD[Array[String]] = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 将 rdd 与样例类进行关联val personRDD: RDD[Person] = rdd1.map(x => Person(x(0), x(1), x(2).toInt))// 将 rdd 转成 DataFrameval df = personRDD.toDF/*** root* |-- id: string (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = false)*/df.printSchema()/*** +---+-------+---+* | id|   name|age|* +---+-------+---+* |  1| youyou| 38|* |  2|   Tony| 25|* |  3|laowang| 18|* |  4|   dali| 30|* +---+-------+---+*/df.show()spark.stop()}
}

2. 读取json文件

  • 在 resources 目录新建 person.json 文件,内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
  • 代码实现
object Case03_ReadJson {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.json(this.getClass.getClassLoader.getResource("person.json").getPath)/*** root* |-- age: long (nullable = true)* |-- name: string (nullable = true)*/df.printSchemaprintln("--------------")/*** +----+-------+* | age|   name|* +----+-------+* |null|Michael|* |  30|   Andy|* |  19| Justin|* +----+-------+*/df.show()spark.stop()}
}

3. 读取parquet文件

  • Spark 自带样例文件 spark-2.3.3-bin-hadoop2.7/examples/src/main/resources/users.parquet 复制到自己的工程的 resources 目录
  • 代码实现:
object Case04_ReadParquet {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.parquet(this.getClass.getClassLoader.getResource("users.parquet").getPath)/*** root* |-- name: string (nullable = true)* |-- favorite_color: string (nullable = true)* |-- favorite_numbers: array (nullable = true)* |    |-- element: integer (containsNull = true)*/df.printSchema/*** +------+--------------+----------------+* |  name|favorite_color|favorite_numbers|* +------+--------------+----------------+* |Alyssa|          null|  [3, 9, 15, 20]|* |   Ben|           red|              []|* +------+--------------+----------------+*/df.showspark.stop()}
}

4. 通过StructType动态指定Schema

  • 应用场景:在开发代码之前,无法确定需要的 DataFrame 对应的 Schema 元信息,这时需要在开发代码的过程中指定。
  • 代码实现:
object Case05_StructTypeSchema {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val rdd: RDD[Array[String]] = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 将rdd与Row对象关联val rowRDD: RDD[Row] = rdd.map(x => Row(x(0), x(1), x(2).toInt))// 指定dataFrame的schema信息,这里指定的字段个数和类型必须要跟Row对象保持一致val schema = StructType(StructField("id", StringType) ::StructField("name", StringType) ::StructField("age", IntegerType) :: Nil)// 利用rdd生成DataFrameval df: DataFrame = spark.createDataFrame(rowRDD, schema)/*** root* |-- id: string (nullable = true)* |-- name: string (nullable = true)* |-- age: integer (nullable = true)*/df.printSchema/*** +---+-------+---+* | id|   name|age|* +---+-------+---+* |  1| youyou| 38|* |  2|   Tony| 25|* |  3|laowang| 18|* |  4|   dali| 30|* +---+-------+---+*/df.show()// 用sql的方式查询结构化数据df.createTempView("person")/*** +---+-------+---+* | id|   name|age|* +---+-------+---+* |  1| youyou| 38|* |  2|   Tony| 25|* |  3|laowang| 18|* |  4|   dali| 30|* +---+-------+---+*/spark.sql("select * from person").show()spark.stop()}
}

DataFrame常用操作

1. DSL风格语法

  • 就是 sparksql 中的 DataFrame 自身提供了一套自己的 Api,可以去使用这套 api 来做相应的处理
  • 在 Case02_ReadTextV2.scala 中:
	// 将 rdd 转成 DataFrameval personDF = personRDD.toDFpersonDF.printSchema()personDF.show()/************************** DSL风格语法 start *************************/// 1. 查询指定字段personDF.select("name").showpersonDF.select($"name").show// 2. 实现 age+1personDF.select($"name", $"age", $"age" + 1).show// 3. 实现 age>30 过滤personDF.filter($"age" > 30).show// 4. 按照 age 分组统计personDF.groupBy("age").count.show// 5. 按照age分组统计次数降序personDF.groupBy("age").count().sort($"age".desc).show/************************** DSL风格语法 end *************************/

2. SQL风格语法(常用)

  • 可以把 DataFrame 注册成一张表,然后通过 sparkSession.sql(sql语句) 操作
    /************************** SQL风格语法 start *************************/// 1. DataFrame注册成表personDF.createTempView("person")// 2. 使用SparkSession调用sql方法统计查询spark.sql("select * from person").showspark.sql("select name from person").showspark.sql("select name, age from person").showspark.sql("select * from person where age > 30").showspark.sql("select count(*) from person where age > 30").showspark.sql("select age, count(*) from person group by age").showspark.sql("select age, count(*) as count from person group by age").showspark.sql("select * from person order by age desc").show/************************** SQL风格语法 end *************************/

DataSet概述

1. DataSet是什么

  • DataSet 是分布式的数据集合,Dataset 提供了强类型支持,也是在 RDD 的每行数据加了类型约束。
  • DataSet 是 DataFrame 的一个扩展,是 SparkSQL1.6 后新增的数据抽象,API 友好
    • 它集中了 RDD 的优点(强类型和可以用强大lambda函数
    • 以及使用了 Spark SQL 优化的执行引擎。
  • DataFrame 是 DataSet 的特例,type DataFrame=DataSet[Row]
    • 可以通过 as 方法将 DataFrame 转换成 DataSet
    • Row 是一个类型,可以是 Person、Animal,所有的表结构信息都用 Row 来表示
  • 优点:
    • DataSet 可以在编译时检查类型
    • 并且是面向对象的编程接口

2. 如何构建DataSet

  • 方式一:通过 sparkSession 调用 createDataset 方法
val ds = spark.createDataset(1 to 10) 	// scala 集合
ds.showval ds = spark.createDataset(sc.textFile("/person.txt"))  //rdd
ds.show
  • 方式二:使用 scala 集合和 rdd 调用 toDS 方法
sc.textFile("/person.txt").toDS
List(1,2,3,4,5).toDS
  • 方式三:把一个 DataFrame 转换成 DataSet
val ds = dataFrame.as[强类型]
  • 方式四:通过一个 DataSet 转换生成一个新的 DataSet
List(1,2,3,4,5).toDS.map(x => x * 10)

3. RDD、DataFrame、DataSet对比

关系是怎样的?

  • 首先,Spark RDD、DataFrame 和 DataSet 是 Spark 的三类 API,他们的发展过程:
    • DataFrame 是 spark1.3.0 版本提出来的,spark1.6.0 版本又引入了 DateSet;
    • 但是在 spark2.0 版本中,DataFrame 和 DataSet 合并为DataSet。
  • 那么你可能会问了:那么,在2.0以后的版本里,RDD是不是不需要了呢?
    • 答案是:NO!
    • 首先,DataFrame 和 DataSet 都是基于 RDD 的,而且这三者之间可以通过简单的API调用进行无缝切换。

数据有什么区别?

三者 API 特点

  • RDD
    • 优点:相比于传统的 MapReduce 框架,Spark 在 RDD 中内置很多函数操作,group、map、filter等,方便处理结构化或非结构化数据。面向对象编程,直接存储的 java 对象,类型转化也安全。
    • 缺点:由于它基本和 hadoop 一样万能的,因此没有针对特殊场景的优化,比如对于结构化数据处理相对于 sql 来比非常麻烦。默认采用的是 java 序列号方式,序列化结果比较大,而且数据存储在 java 堆内存中,导致 gc 比较频繁。
  • DataFrame
    • 优点:结构化数据处理非常方便,支持Avro、CSV、ElasticSearch 和 Cassandra 等 kv 数据,也支持 HIVE tables、MySQL 等传统数据表。有针对性的优化:采用 Kryo 序列化;由于数据结构元信息 spark 已经保存,序列化时不需要带上元信息,大大的减少了序列化大小;而且数据保存在堆外内存中,减少了 gc 次数,所以运行更快。hive兼容,支持hql、udf等。
    • 缺点:编译时不能类型转化安全检查,运行时才能确定是否有问题。对于对象支持不友好,rdd 内部数据直接以 java 对象存储,dataframe 内存存储的是 row 对象而不能是自定义对象。
  • DataSet
    • 优点:DateSet 整合了 RDD 和 DataFrame 的优点,支持结构化和非结构化数据。和 RDD 一样,支持自定义对象存储。和 ataFrame 一样,支持结构化数据的 sql 查询。采用了堆外内存存储,gc 友好。类型转化安全,代码友好。

三者如何相互转换

  • 涉及到RDD,DataFrame,DataSet之间操作时,需要隐式转换导入:import spark.implicits._
  • 这里的 spark 不是包名,而是代表了 SparkSession 的那个对象名,所以必须先创建 SparkSession 对象再导入

case class Person(id: String, name: String, age: Int)object Case06_SparkConversion {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val sc = spark.sparkContextsc.setLogLevel("WARN")// 隐式转换import spark.implicits._val rdd = sc.textFile(this.getClass.getClassLoader.getResource("person.txt").getPath).map(x => x.split(" "))// 把rdd与样例类进行关联val personRDD = rdd.map(x => Person(x(0), x(1), x(2).toInt))// 1. rdd -> dfval df1 = personRDD.toDFdf1.show// 2. rdd -> dsval ds1 = personRDD.toDSds1.show// 3. df -> rddval rdd1 = df1.rddprintln(rdd1.collect.toList)// 4. ds -> rddval rdd2 = ds1.rddprintln(rdd2.collect.toList)// 5. ds -> dfval df2: DataFrame = ds1.toDFdf2.show// df -> dsval ds2: Dataset[Person] = df2.as[Person]ds2.showspark.stop()}
}

读取外部数据源

1. SparkSQL读取MySQL数据

  • Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一些列的计算后,还可以将数据再写回关系型数据库中。
  • 代码示例:
/*** 使用 SparkSQL读写MySQL表中的数据*/
object Case07_ReadMySQL {def main(args: Array[String]): Unit = {// 1. 创建 SparkConf 对象val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")// 2. 创建 SparkSession 对象val spark = SparkSession.builder().config(conf).getOrCreate()// 3. 创建 DataFrameval url = "jdbc:mysql://192.168.254.132:3306/mydb?characterEncoding=UTF-8"val tableName = "jobdetail"val props = new Properties()props.setProperty("user", "root")props.setProperty("password", "123456")val mysqlDF: DataFrame = spark.read.jdbc(url, tableName, props)// 4. 读取 MySQL 表中的数据// 4.1 打印schema信息mysqlDF.printSchema()// 4.2 展示数据mysqlDF.show()// 4.3 将dataFrame注册成表mysqlDF.createTempView("job_detail")spark.sql("select * from job_detail where city = '广东'").show()spark.stop()}
}

2. Spark操作CSV文件并将结果写入MySQL

object Case08_ReadCsvWriteMySQL {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()spark.sparkContext.setLogLevel("WARN")val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") // 时间转换.option("header", "true") // 第一行数据都是head(字段属性的意思)
//      .option("multiLine", "true") // 数据可能换行.load(this.getClass.getClassLoader.getResource("data").getPath)df.createOrReplaceTempView("job_detail")spark.sql("select job_name,job_url,job_location,job_salary,job_company,job_experience,job_class,job_given,job_detail,company_type,company_person,search_key,city from job_detail where job_company = '北京无极慧通科技有限公司'").show(80)val props = new Properties()props.put("user", "root")props.put("password", "123456")df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.254.132:3306/mydb?useSSL=false&useUnicode=true&characterEncoding=UTF-8","mydb.jobdetail_copy", props)}
}

3. Spark与Hive整合

  • Spark on hive:Spark 通过 Spark SQL 使用 Hive 的语句操作 Hive,底层运行的还是 spark rdd:
    • 通过 spark sql,加载 hive 的配置文件,获取到 hive 的元数据信息;
    • spark sql 获取到 hive 的元数据信息之后就可以拿到 hive 的所有表的数据;
    • 接下来就可以通过 spark sql 来操作 hive 表中的数据。
  • Hive on spark:将 Hive 查询,从 MapReduce的 MR(Hadoop计算引擎) 操作替换为 spark rdd(spark执行引擎)操作。相对于 spark on hive,这个实现起来则麻烦很多,必须重新编译你的 spark 和导入 jar 包,不过目前大部分使用的是 spark on hive。
spark整合hive——通过SparkSql-shell
  • 拷贝 hive-site.xml 配置文件:将 node03 服务器安装的 hive 目录下 conf 文件夹下面的 hive-site.xml 拷贝到 spark 安装的各个机器节点,node03 执行以下命令进行拷贝
$ pwd
/bigdata/install/hive-3.1.2/conf
$ scp hive-site.xml node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
  • 拷贝 mysql 驱动包:将 hive 当中 mysql 的连接驱动包拷贝到 spark 安装家目录下的 jars 目录下,node03 执行下命令拷贝 mysql 的 lib 驱动包
$ ll mysql-connector-java-5.1.38.jar 
-rw-rw-r--. 1 hadoop hadoop 983911 12月  6 2021 mysql-connector-java-5.1.38.jar
$ pwd
/bigdata/install/hive-3.1.2/lib
$ scp mysql-connector-java-5.1.38.jar node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
$ scp mysql-connector-java-5.1.38.jar node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
$ scp mysql-connector-java-5.1.38.jar node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/jars/
  • 进入 spark-sql 直接操作 hive 数据库当中的数据:
    • 在 spark2.0 版本后,由于出现了 sparkSession,在初始化 sqlContext 时,会设置默认的 spark.sql.warehouse.dir=spark-warehouse,此时将 hive 与 spark sql 整合完成后,在通过 spark-sql 脚本启动时,会在当前目录下创建一个 spark.sql.warehouse.dirspark-warehouse 的目录,存放由 spark-sql 创建数据库和创建表的数据信息,与之前 hive 的数据信息不是放在同一个路径下(可以互相访问)。但是此时 spark-sql 中表的数据在本地,不利于操作,也不安全。
    • 所有在启动的时候需要加上下面这样一个参数,以保证 spark-sql 启动时不再产生新的存放数据的目录,sparksql 与 hive 最终使用的是 hive 统一存放数据的目录。
--conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse
  • 在 hive 中创建表并插入数据:
CREATE EXTERNAL TABLE `student`(`ID` bigint COMMENT '',`CreatedBy` string COMMENT '创建人',`CreatedTime` string COMMENT '创建时间',`UpdatedBy` string COMMENT '更新人',`UpdatedTime` string COMMENT '更新时间',`Version` int COMMENT '版本号',`name` string COMMENT '姓名'
) COMMENT '学生表'
PARTITIONED BY (`dt` String COMMENT 'partition')
row format delimited fields terminated by '\t'
location '/student';INSERT INTO TABLE student partition(dt='2022-07-12') VALUES(1, "xxx", "2022-07-12", "", "", 1, "zhangsan");
INSERT INTO TABLE student partition(dt='2022-07-12') VALUES(2, "xxx", "2022-07-12", "", "", 2, "lisi");

  • 通过 shell 方式:node01 直接执行以下命令,进入 spark-sql 交互界面,然后操作 hive 当中的数据
$ spark-sql --master local[2] \
--executor-memory 512m --total-executor-cores 3 \
--conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse# 执行查询
select * from student;

  • 通过脚本方式:编写如下脚本并执行
#!/bin/sh
# 定义 spark sql 提交脚本的头信息
SUBMIT_INFO="spark-sql --master spark://node01:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse"
# 定义一个 sql 语句
SQL="select * from student;"
# 执行 sql 语句,类似于 hive -e sql 
echo "$SUBMIT_INFO"
echo "$SQL"
$SUBMIT_INFO -e "$SQL"
  • 执行:
$ sh spark_on_hive.sh 

spark的thrift server与hive进行远程交互
  • 除了可以通过 spark-shell 来与 hive 进行整合之外,我们也可以通过 spark 的 thrift 服务来远程与 hive 进行交互。
  • node03 执行以下命令修改 hive-site.xml 的配置属性,添加以下几个配置
<property><name>hive.metastore.uris</name><value>thrift://node03:9083</value><description>Thrift URI for the remote metastore</description>
</property>
<property><name>hive.server2.thrift.min.worker.threads</name><value>5</value>
</property>
<property><name>hive.server2.thrift.max.worker.threads</name><value>500</value>
</property>
  • 修改完的配置文件后,分发到其他机器:
$ pwd
/bigdata/install/hive-3.1.2/conf
$ scp hive-site.xml node01:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node02:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
$ scp hive-site.xml node03:/bigdata/install/spark-2.3.3-bin-hadoop2.7/conf/
  • node03 启动 metastore 服务
hive --service metastore
  • node03 执行以下命令启动 spark 的 thrift server:hive 安装在哪一台,就在哪一台服务器启动spark 的 thrift server
$ pwd
/bigdata/install/spark-2.3.3-bin-hadoop2.7/sbin
$ ./start-thriftserver.sh --master local[*] --executor-memory 2g --total-executor-cores 5 
  • 直接使用 beeline 来连接:直接在 node03 服务器上面使用 beeline 来进行连接 spark-sql
$ beeline --color=true 
beeline> !connect jdbc:hive2://node03:10000
Connecting to jdbc:hive2://node03:10000
Enter username for jdbc:hive2://node03:10000: hadoop
Enter password for jdbc:hive2://node03:10000: ******

4. 读写Hive数据

  • 添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.3</version>
</dependency>
  • 将服务端配置 hive-site.xml,放入到 idea 的 resources 目录下
  • 代码实现:
object Case09_SparkSQLOnHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").enableHiveSupport() // 启用hive.config("spark.sql.warehouse.dir", "hdfs://node01:8020/user/hive/warehouse").getOrCreate()val df: DataFrame = spark.sql("select * from student")df.show()// 直接写表达式,通过 insert into 插入df.write.saveAsTable("student1")spark.sql("insert into student1 select * from student")}
}

5. 读写HBase数据

  • 需要添加依赖:
<dependency><groupId>org.json4s</groupId><artifactId>json4s-jackson_2.11</artifactId><version>3.3.0</version>
</dependency>
  • 创建 HBase 表,并插入数据:
create 'spark_hbase','info'
put 'spark_hbase','0001','info:name','tangseng'
put 'spark_hbase','0001','info:age','30'
put 'spark_hbase','0001','info:sex','0'
put 'spark_hbase','0001','info:addr','beijing'
put 'spark_hbase','0002','info:name','sunwukong'
put 'spark_hbase','0002','info:age','508'
put 'spark_hbase','0002','info:sex','0'
put 'spark_hbase','0002','info:addr','shanghai'
put 'spark_hbase','0003','info:name','zhubajie'
put 'spark_hbase','0003','info:age','715'
put 'spark_hbase','0003','info:sex','0'
put 'spark_hbase','0003','info:addr','shenzhen'
put 'spark_hbase','0004','info:name','bailongma'
put 'spark_hbase','0004','info:age','1256'
put 'spark_hbase','0004','info:sex','0'
put 'spark_hbase','0004','info:addr','donghai'
put 'spark_hbase','0005','info:name','shaheshang'
put 'spark_hbase','0005','info:age','1008'
put 'spark_hbase','0005','info:sex','0'
put 'spark_hbase','0005','info:addr','tiangong'create "spark_hbase_copy",'info'
  • 代码实现:
object Case10_SparkSQLOnHBase {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()spark.sparkContext.setLogLevel("WARN")import spark.implicits._val hconf: Configuration = HBaseConfiguration.createhconf.set(HConstants.ZOOKEEPER_QUORUM, "node01:2181,node02:2181,node03:2181")val hbaseContext = new HBaseContext(spark.sparkContext, hconf)// 定义映射的 catalogval catalog: String = "{\"table\":{\"namespace\":\"default\",\"name\":\"spark_hbase\"},\"rowkey\":\"key\",\"columns\":{\"f0\":{\"cf\":\"rowkey\",\"col\":\"key\",\"type\":\"string\"},\"f1\":{\"cf\":\"info\",\"col\":\"addr\",\"type\":\"string\"},\"f2\":{\"cf\":\"info\",\"col\":\"age\",\"type\":\"boolean\"},\"f3\":{\"cf\":\"info\",\"col\":\"name\",\"type\":\"string\"}}}";// 读取HBase数据val ds: DataFrame = spark.read.format("org.apache.hadoop.hbase.spark").option(HBaseTableCatalog.tableCatalog, catalog).load()ds.show(10)val catalogCopy: String = catalog.replace("spark_hbase", "spark_hbase_out")// 数据写入HBaseds.write.format("org.apache.hadoop.hbase.spark").option(HBaseTableCatalog.tableCatalog, catalogCopy).mode(SaveMode.Overwrite).save()}
}

SparkSQL自定义函数

  • 用户自定义函数类别分为以下三种:
    • ① UDF:输入一行,返回一个结果(一对一)
    • ② UDAF:输入多行,返回一行,这里的是 aggregate,聚合的意思,如果业务复杂,需要自己实现聚合函数
    • ③ UDTF:输入一行,返回多行(一对多),在 SparkSQL 中没有,因为 Spark 中使用 flatMap 即可实现这个功能

1. 自定义UDF函数:一对一

  • 需求:读取深圳二手房成交数据,对房子的年份进行自定义函数处理,如果没有年份,那么就给默认值1990。
object Case11_SparkUDF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")// 注册UDFspark.udf.register("house_udf", new UDF1[String, String] {val pattern: Pattern = Pattern.compile("^[0-9]*$")override def call(input: String): String = {val matcher = pattern.matcher(input)if (matcher.matches()) inputelse "1990"}}, DataTypes.StringType)// 使用UDFspark.sql("select house_udf(house_age) from house_sale limit 200").show()spark.stop()}
}

2. 自定义UDAF函数:多对一

  • 需求:自定义UDAF函数,读取深圳二手房数据,然后按照楼层进行分组,求取每个楼层的平均成交金额
object Case12_SparkUDAF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")spark.sql("select floor from house_sale limit 30").show()spark.udf.register("udaf", new MyAverage)spark.sql("select floor, udaf(house_sale_money) from house_sale group by floor").show()df.printSchema()spark.stop()}
}
class MyAverage extends UserDefinedAggregateFunction {// 聚合函数输入函数的数据类型override def inputSchema: StructType = StructType(StructField("floor", DoubleType) :: Nil)// 聚合缓冲区中值的数据类型override def bufferSchema: StructType = {StructType(StructField("sum", DoubleType) :: StructField("count", LongType) :: Nil)}// 返回值类型override def dataType: DataType = DoubleType// 对于相同输入是否一直返回相同的输出override def deterministic: Boolean = true// 初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {// 用于存储不同类型的楼房的总成交额buffer(0) = 0D// 用于存储不同类型的楼房的总个数buffer(1) = 0L}// 相同Execute间的数据合并(分区内聚合)override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (!input.isNullAt(0)) {buffer(0) = buffer.getDouble(0) + input.getDouble(0)buffer(1) = buffer.getLong(1) + 1}}// 不同Execute间的数据合并(分区外聚合)override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// 计算最终结果override def evaluate(buffer: Row): Any = buffer.getDouble(0) / buffer.getLong(1)
}

3. 自定义UDTF函数:一对多

  • 需求:自定义UDTF函数,读取深圳二手房数据,然后将 part_place(部分地区)以空格切分进行展示
object Case13_SparkUDTF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val df: DataFrame = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "true").option("multiLine", "true").load("/Volumes/F/MyGitHub/bigdata/spark-demo/src/main/resources/深圳链家二手房成交明细.csv")df.createOrReplaceTempView("house_sale")// 注册UDTF算子,这里无法使用sparkSession.udf.register(),注意包全路径spark.sql("CREATE TEMPORARY FUNCTION MySplit as 'com.yw.spark.example.sql.cases.MySplit'")spark.sql("select part_place, MySplit(part_place, ' ') from house_sale limit 50").show()spark.stop()}
}class MySplit extends GenericUDTF {override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {if (args.length != 2) {throw new UDFArgumentLengthException("UserDefinedUDTF takes only two argument")}// 判断第一个参数是不是字符串参数if (args(0).getCategory() != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentException("UserDefinedUDTF takes string as a parameter")}// 列名:会被用户传递的覆盖val fieldNames: ArrayList[String] = new ArrayList[String]()fieldNames.add("col1")// 返回列以什么格式输出,这里是string,添加几个就是几个列,和上面的名字个数对应个数val fieldOIs: ArrayList[ObjectInspector] = new ArrayList[ObjectInspector]()fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs)}override def process(objects: Array[AnyRef]): Unit = {// 获取数据val data: String = objects(0).toString// 获取分隔符val splitKey: String = objects(1).toString// 切分数据val words: Array[String] = data.split(splitKey)// 遍历写出words.foreach(x => {// 将数据放入集合val tmp: Array[String] = new Array[String](1)tmp(0) = xforward(tmp)})}override def close(): Unit = {// 没有流操作}
}
  • 相关代码 github 地址:

更多推荐

大数据高级开发工程师——Spark学习笔记(6)

本文发布于:2023-06-28 19:50:15,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/932663.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:学习笔记   工程师   高级   数据   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!