SparkSQL 定义查询和使用

编程入门 行业动态 更新时间:2024-10-27 00:33:58

SparkSQL <a href=https://www.elefans.com/category/jswz/34/1771289.html style=定义查询和使用"/>

SparkSQL 定义查询和使用

SparkSQL 创建和使用

        • 方式一
          • 1.创建SparkSession
          • 2.读取数据为DataSet
          • 3.转成DataFrame并指定列名称
          • 4.保存成文件
          • 5.创建临时表使用SQL查询
          • 6.使用API的方式查询
          • 7.SQL方式实现分组求TOPN
          • 8.API方式实现分组求TOPN
        • 方式二
          • 1.创建saprkSession
          • 2.读取文件内容
          • 3.定义schema信息
          • 4.创建DataFrame
          • 5.创建临时表
          • 6.查询数据


方式一

1.创建SparkSession
val spark = SparkSession.builder().master("local[2]").appName("SparkSqlPractice").getOrCreate()
2.读取数据为DataSet
val ds: Dataset[String] = spark.read.textFile("spark-sql/data/access.log")
3.转成DataFrame并指定列名称
val output = "spark-sql/out/access"import spark.implicits._val df: DataFrame = ds.map(x => {val splits = x.split("\\|")val ip = splits(0)val responsetime = splits(2)val httpcode = splits(5)val requestsize = splits(6).toIntval province = splits(16)val city = splits(17)val isp = splits(18)(ip, responsetime, httpcode, requestsize, province, city, isp)}).toDF("ip", "responsetime", "httpcode", "requestsize", "province", "city", "isp")
4.保存成文件
// 保存成各种形式的文件
df.write.format("parquet").mode("overwrite").save(output)
df.write.format("json").mode("overwrite").save(output)
df.write.format("orc").mode("overwrite").save(output)
5.创建临时表使用SQL查询
  // 创建临时表并查询df.createTempView("access")spark.sql("select province,sum(requestsize) total from access group by province").show(10, false)
6.使用API的方式查询
//使用api方式查询
import org.apache.spark.sql.functions._    
df.groupBy("province").agg(sum("requestsize").as("traffics")).sort('traffics.desc).show(false)
7.SQL方式实现分组求TOPN
// SQL 方式实现分组求province访问次数最多的TopN
df.createTempView("access")
val topNSQL =
"""
|select * from (
|select t.*,row_number() over(partition by province order by cnt desc) as r
|from
|(select province,city,count(1) cnt
|	from access
|	group by province,city) t)a
|where a.r < 2 order by a.province,a.r asc
|""".stripMarginspark.sql(topNSQL).show()
8.API方式实现分组求TOPN

todo:


方式二

1.创建saprkSession
val spark = SparkSession.builder().appName("SparkSQLAPP").master("local").getOrCreate()
2.读取文件内容
val studentData: RDD[Row] = spark.sparkContext.textFile("spark-sql/data/student.txt").map(line => {val splits = line.split(",")Row(splits(0).toInt, splits(1), splits(2).toInt)})
3.定义schema信息
val studentSchema = StructType(mutable.ArraySeq(StructField("sno", IntegerType, nullable = false),StructField("sname", StringType, nullable = false),StructField("sage", IntegerType, nullable = false)))
4.创建DataFrame
val studentDF: DataFrame = spark.createDataFrame(studentData, studentSchema)
5.创建临时表
studentDF.createTempView("student")
6.查询数据
spark.sql("select sno,sname,sage from student").show()

更多推荐

SparkSQL 定义查询和使用

本文发布于:2023-07-28 20:16:18,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1298323.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:定义   SparkSQL

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!