保姆级)Spark气象监测数据分析"/>
(保姆级)Spark气象监测数据分析
目录
写在前面的话
需要的前瞻知识
用到的软件版本
数据集
代码原理(比较重要)
部分代码
Task4函数的代码(主要部分)
主函数代码
运行spark
导包
一些Spark信息的和schema的导入
如果spark链接报错
运行结果
总纲
(保姆级)Spark气象监测数据分析-总纲
写在前面的话
首先这篇博客绝对原创。读者遇到编程中的任何问题可以留言,看到了就会回复,由于有部分读者受困于大数据的生产实习,着急可以加qq1259097853
需要的前瞻知识
这篇博客是假设读者都是已经安装好了Hadoop,Spark,以及对idea插件等,如果在安装这些大数据软件遇到了困难可以根据版本号在CSDN里搜对应的大数据软件安装
用到的软件版本
Hadoop2.7.7;Java1.8.0;sbt1.4.0;Spark2.4.0;Hive2.1.1;ZooKeeper3.5.10;Python3.7.9
数据集
数据集
也可点击下面的链接
链接:=494j
提取码:494j
代码原理(比较重要)
该部分数据分析使用res.csv,即污染物浓度的逐小时数据。其中使用到了Spark ML中计算皮尔逊相关系数的函数包,主要步骤如下:
(1)使用sql语句,从res.csv中选择SO2和NO2的逐小时浓度数据;
(2)保存其数据至csv文件,供后续可视化使用;
(3)使用describe函数,计算其数量、均值、标准差以及最值;
(4)将选择的数据映射为Double类型;
(5)使用Statistics.corr(SO2, NO2, "pearson")函数,计算其相关系数。
计算结果为:SO2浓度与NO2浓度的皮尔逊相关系数为0.41。
因为PM10浓度的变化与空气含水量密切相关,比如当相对湿度大于60%以上时, PM10颗粒会吸湿增大,所以该部分试图分析PM10浓度与空气湿度的关系。主要步骤参考上一部分:
(1)使用sql语句,从res.csv中选择PM10浓度和空气湿度的逐小时浓度数据;
(2)保存其数据至csv文件,供后续可视化使用;
(3)使用describe函数,计算其数量、均值、标准差以及最值;
(4)将选择的数据映射为Double类型;
(5)使用Statistics.corr(PM10, Air, "pearson")函数,计算其相关系数。
计算结果为:PM10浓度与空气湿度的皮尔逊相关系数为 -0.41。
因为云量及其云内液体的吸收对O3含量有一定影响,所以该部分试图对O3浓度与云量的关系进行分析,主要使用斯皮尔曼相关系数。主要步骤参考上一部分:
(1)使用sql语句,从res.csv中选择O3浓度和云量的逐小时浓度数据;
(2)保存其数据至csv文件,供后续可视化使用;
(3)使用describe函数,计算其数量、均值、标准差以及最值;
(4)将选择的数据映射为Double类型;
(5)使用Statistics.corr(O3, Cloud, "spearman")函数,计算其相关系数。
计算结果为:O3浓度与云量的斯皮尔曼相关系数为 0.04,关联性不高。
部分代码
对于代码我分成了几个部分,有部分代码和之前只有细微差别
Task4函数的代码(主要部分)
def Task4(df: DataFrame): Unit = {df.createOrReplaceTempView("SO2_NO2")val SO2_NO2_1 = spark.sql("select `监测时间`,`SO2监测浓度(μg/m³)`,`NO2监测浓度(μg/m³)` from SO2_NO2 where `SO2监测浓度(μg/m³)` > 0 ")SO2_NO2_1.write.option("header",true).mode("overwrite").csv("file:/root/work/Task4/SO2_NO2.csv")SO2_NO2_1.describe().showval SO2_NO2_2= spark.sql("select`SO2监测浓度(μg/m³)`,`NO2监测浓度(μg/m³)` from SO2_NO2 where `SO2监测浓度(μg/m³)` > 0 ")val rdd_real1 = SO2_NO2_2.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))val SO2 = rdd_real1.map(x=>x._1.toDouble )val NO2 = rdd_real1.map(x=>x._2.toDouble )val SO2withNO2cor_pearson:Double = Statistics.corr(SO2, NO2, "pearson")df.createOrReplaceTempView("PM25_Humidity")val PM25_Humidity_1 = spark.sql("select `监测时间`,`PM2.5监测浓度(μg/m³)`,`湿度(%)` from PM25_Humidity where `PM2.5监测浓度(μg/m³)` > 0 ")PM25_Humidity_1.write.option("header",true).mode("overwrite").csv("file:/root/work/Task4/PM25_Humidity.csv")PM25_Humidity_1.describe().showval PM25_Humidity_2= spark.sql("select `PM2.5监测浓度(μg/m³)`,`湿度(%)` from PM25_Humidity where `PM2.5监测浓度(μg/m³)` > 0")val rdd_real2 = PM25_Humidity_2.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))val PM25 = rdd_real2.map(x=>x._1.toDouble )val Humidity = rdd_real2.map(x=>x._2.toDouble )val PM25_Humiditycor_pearson:Double = Statistics.corr(PM25, Humidity, "pearson")df.createOrReplaceTempView("PM10_Humidity")val PM10_Humidity_1 = spark.sql("select `监测时间`,`PM10监测浓度(μg/m³)`,`湿度(%)` from PM10_Humidity where `PM10监测浓度(μg/m³)` > 0 ")PM10_Humidity_1.write.option("header",true).mode("overwrite").csv("file:/root/work/Task4/PM10_Humidity.csv")PM10_Humidity_1.describe().showval PM10_Humidity_2= spark.sql("select `PM10监测浓度(μg/m³)`,`湿度(%)` from PM10_Humidity where `PM10监测浓度(μg/m³)` > 0")val rdd_real3 = PM10_Humidity_2.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))val PM10 = rdd_real3.map(x=>x._1.toDouble )val Humidity1 = rdd_real3.map(x=>x._2.toDouble )val PM10_Humiditycor_pearson:Double = Statistics.corr(PM10, Humidity1, "pearson")df.createOrReplaceTempView("O3_Cloud")val O3_Cloud_1 = spark.sql("select `监测时间`,`O3监测浓度(μg/m³)`,`云量` from O3_Cloud where `O3监测浓度(μg/m³)` > 0 ")O3_Cloud_1.write.option("header",true).mode("overwrite").csv("file:/root/work/Task4/O3_Cloud.csv")O3_Cloud_1.describe().showval O3_Cloud_2= spark.sql("select `O3监测浓度(μg/m³)`,`云量` from O3_Cloud where `O3监测浓度(μg/m³)` > 0 ")val rdd_real4 = O3_Cloud_2.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))val O3 = rdd_real4.map(x=>x._1.toDouble )val Cloud = rdd_real4.map(x=>x._2.toDouble )val O3withCloud_cor_pearson:Double = Statistics.corr(O3, Cloud, "pearson")println("SO2 with NO2 Pearson:",SO2withNO2cor_pearson)println("PM25 with Humidity Pearson:",PM25_Humiditycor_pearson)println("PM10 with Humidity Pearson:",PM10_Humiditycor_pearson)println("O3 with Cloud Pearson:",O3withCloud_cor_pearson)// val SO2_2 = SO2_1.orderBy("监测时间")
// SO2_2.write.option("header", true).mode("overwrite").csv("file:///root/work/Task1/SO2_20.csv")
// // SO2_1.show()
// println("筛选六大污染物浓度排名前20的时段:")
// SO2_2.show()}
主函数代码
def main(args: Array[String]): Unit = {// Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
// Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)Logger.getLogger("org").setLevel(Level.ERROR)println("Test Begin")
// println(SparkSession.getClass)val df = spark.read.schema(schema).option("header", "true").csv("file:///root/res.csv")
// df.show()
// Task1(df)
// Task2(df)val df_data2 = spark.read.schema(schema_data2).option("header", "true").csv("file:///root/data2.csv")
// df_data2.show()
// Task3(df_data2)Task4(df)}
运行spark
[root@master ~]# ./spark-2.4.0-bin-hadoop2.7/sbin/start-all.sh
导包
对于包依赖的安装我会过几天更新,具体是步骤0.1,如果有人看到了这里但是我忘了更新可以提醒我!
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.apache.log4j.{Level,Logger}
import org.apache.spark.mllib.stat.Statisticsimport scala.collection.mutable.ArrayBuffer
一些Spark信息的和schema的导入
val schema = StructType(Array(StructField("", FloatType),StructField("监测时间", StringType),StructField("SO2监测浓度(μg/m³)", FloatType),StructField("NO2监测浓度(μg/m³)", FloatType),StructField("PM10监测浓度(μg/m³)", FloatType),StructField("PM2.5监测浓度(μg/m³)", FloatType),StructField("O3监测浓度(μg/m³)", FloatType),StructField("CO监测浓度(mg/m³)", FloatType),StructField("温度(℃)", FloatType),StructField("湿度(%)", FloatType),StructField("气压(MBar)", FloatType),StructField("风速(m/s)", FloatType),StructField("风向(°)", FloatType),StructField("云量", FloatType),StructField("长波辐射(W/m²)", FloatType)))val schema_data2 = StructType(Array(StructField("监测日期", StringType),StructField("SO2监测浓度(μg/m3)", FloatType),StructField("NO2监测浓度(μg/m3)", FloatType),StructField("PM10监测浓度(μg/m3)", FloatType),StructField("PM2.5监测浓度(μg/m3)", FloatType),StructField("O3最大八小时滑动平均监测浓度(μg/m3)", FloatType),StructField("CO监测浓度(mg/m3)", FloatType)))val spark = SparkSession.builder().master("spark://192.168.244.130:7077").getOrCreate()
如果spark链接报错
如果链接spark的时候会失败,可以使用下面的代码替换之前的(但是这个只能是测试的时候用,是假的Spark,具体报错的debug过几天我也会做,忘了有读者需要可以提醒我)
val spark = SparkSession.builder().master("local[2]").getOrCreate()
运行结果
Test Begin
+-------+--------------+------------------+------------------+
|summary| 监测时间|SO2监测浓度(μg/m³)|NO2监测浓度(μg/m³)|
+-------+--------------+------------------+------------------+
| count| 18932| 18932| 18932|
| mean| null| 7.038664694696809| 32.96862455102472|
| stddev| null|3.6722874731107695|24.059174407491813|
| min|2019-10-1 0:00| 1.0| 2.0|
| max| 2021-7-9 9:00| 47.0| 211.0|
+-------+--------------+------------------+------------------+23/09/20 05:52:11 WARN BLAS: Failed to load implementation from: com.github.fommillib.NativeSystemBLAS
23/09/20 05:52:11 WARN BLAS: Failed to load implementation from: com.github.fommillib.NativeRefBLAS
+-------+--------------+--------------------+------------------+
|summary| 监测时间|PM2.5监测浓度(μg/m³)| 湿度(%)|
+-------+--------------+--------------------+------------------+
| count| 18856| 18856| 18856|
| mean| null| 24.079974543911753| 69.09917267713195|
| stddev| null| 18.29486149045867|16.135189520717272|
| min|2019-10-1 0:00| 1.0| 14.0|
| max| 2021-7-9 9:00| 163.0| 99.0|
+-------+--------------+--------------------+------------------++-------+--------------+-------------------+------------------+
|summary| 监测时间|PM10监测浓度(μg/m³)| 湿度(%)|
+-------+--------------+-------------------+------------------+
| count| 19140| 19140| 19140|
| mean| null| 44.20491118077325| 69.26734587251829|
| stddev| null| 27.377230327250064|16.126409365072348|
| min|2019-10-1 0:00| 1.0| 14.0|
| max| 2021-7-9 9:00| 217.0| 99.0|
+-------+--------------+-------------------+------------------++-------+--------------+------------------+------------------+
|summary| 监测时间| O3监测浓度(μg/m³)| 云量|
+-------+--------------+------------------+------------------+
| count| 17965| 17965| 17965|
| mean| null|57.839020317283605|0.4672175035878058|
| stddev| null| 50.68049641548892|0.3282385023346934|
| min|2019-10-1 0:00| 1.0| 0.0|
| max| 2021-7-9 9:00| 405.0| 1.0|
+-------+--------------+------------------+------------------+(SO2 with NO2 Pearson:,0.4104482705759166)
(PM25 with Humidity Pearson:,-0.29398298555150565)
(PM10 with Humidity Pearson:,-0.41704389566755495)
(O3 with Cloud Pearson:,0.042812164393700905)Process finished with exit code 0
更多推荐
(保姆级)Spark气象监测数据分析
发布评论