定义查询和使用"/>
SparkSQL 定义查询和使用
SparkSQL 创建和使用
- 方式一
- 1.创建SparkSession
- 2.读取数据为DataSet
- 3.转成DataFrame并指定列名称
- 4.保存成文件
- 5.创建临时表使用SQL查询
- 6.使用API的方式查询
- 7.SQL方式实现分组求TOPN
- 8.API方式实现分组求TOPN
- 方式二
- 1.创建saprkSession
- 2.读取文件内容
- 3.定义schema信息
- 4.创建DataFrame
- 5.创建临时表
- 6.查询数据
方式一
1.创建SparkSession
val spark = SparkSession.builder().master("local[2]").appName("SparkSqlPractice").getOrCreate()
2.读取数据为DataSet
val ds: Dataset[String] = spark.read.textFile("spark-sql/data/access.log")
3.转成DataFrame并指定列名称
val output = "spark-sql/out/access"import spark.implicits._val df: DataFrame = ds.map(x => {val splits = x.split("\\|")val ip = splits(0)val responsetime = splits(2)val httpcode = splits(5)val requestsize = splits(6).toIntval province = splits(16)val city = splits(17)val isp = splits(18)(ip, responsetime, httpcode, requestsize, province, city, isp)}).toDF("ip", "responsetime", "httpcode", "requestsize", "province", "city", "isp")
4.保存成文件
// 保存成各种形式的文件
df.write.format("parquet").mode("overwrite").save(output)
df.write.format("json").mode("overwrite").save(output)
df.write.format("orc").mode("overwrite").save(output)
5.创建临时表使用SQL查询
// 创建临时表并查询df.createTempView("access")spark.sql("select province,sum(requestsize) total from access group by province").show(10, false)
6.使用API的方式查询
//使用api方式查询
import org.apache.spark.sql.functions._
df.groupBy("province").agg(sum("requestsize").as("traffics")).sort('traffics.desc).show(false)
7.SQL方式实现分组求TOPN
// SQL 方式实现分组求province访问次数最多的TopN
df.createTempView("access")
val topNSQL =
"""
|select * from (
|select t.*,row_number() over(partition by province order by cnt desc) as r
|from
|(select province,city,count(1) cnt
| from access
| group by province,city) t)a
|where a.r < 2 order by a.province,a.r asc
|""".stripMarginspark.sql(topNSQL).show()
8.API方式实现分组求TOPN
todo:
方式二
1.创建saprkSession
val spark = SparkSession.builder().appName("SparkSQLAPP").master("local").getOrCreate()
2.读取文件内容
val studentData: RDD[Row] = spark.sparkContext.textFile("spark-sql/data/student.txt").map(line => {val splits = line.split(",")Row(splits(0).toInt, splits(1), splits(2).toInt)})
3.定义schema信息
val studentSchema = StructType(mutable.ArraySeq(StructField("sno", IntegerType, nullable = false),StructField("sname", StringType, nullable = false),StructField("sage", IntegerType, nullable = false)))
4.创建DataFrame
val studentDF: DataFrame = spark.createDataFrame(studentData, studentSchema)
5.创建临时表
studentDF.createTempView("student")
6.查询数据
spark.sql("select sno,sname,sage from student").show()
更多推荐
SparkSQL 定义查询和使用
发布评论