(保姆级)Spark气象监测数据分析

编程入门 行业动态 更新时间:2024-10-12 05:48:04

(<a href=https://www.elefans.com/category/jswz/34/1768653.html style=保姆级)Spark气象监测数据分析"/>

(保姆级)Spark气象监测数据分析

目录

 写在前面的话

需要的前瞻知识

用到的软件版本

数据集

代码原理(比较重要)

部分代码

Task4函数的代码(主要部分)

 主函数代码

运行spark

导包

一些Spark信息的和schema的导入

如果spark链接报错

运行结果

 总纲

(保姆级)Spark气象监测数据分析-总纲

写在前面的话

首先这篇博客绝对原创。读者遇到编程中的任何问题可以留言,看到了就会回复,由于有部分读者受困于大数据的生产实习,着急可以加qq1259097853

需要的前瞻知识

这篇博客是假设读者都是已经安装好了Hadoop,Spark,以及对idea插件等,如果在安装这些大数据软件遇到了困难可以根据版本号在CSDN里搜对应的大数据软件安装

用到的软件版本

Hadoop2.7.7;Java1.8.0;sbt1.4.0;Spark2.4.0;Hive2.1.1;ZooKeeper3.5.10;Python3.7.9

数据集

数据集

也可点击下面的链接

链接:=494j 
提取码:494j

代码原理(比较重要)

该部分数据分析使用res.csv,即污染物浓度的逐小时数据。其中使用到了Spark ML中计算皮尔逊相关系数的函数包,主要步骤如下:
(1)使用sql语句,从res.csv中选择SO2和NO2的逐小时浓度数据;
(2)保存其数据至csv文件,供后续可视化使用;
(3)使用describe函数,计算其数量、均值、标准差以及最值;
(4)将选择的数据映射为Double类型;
(5)使用Statistics.corr(SO2, NO2, "pearson")函数,计算其相关系数。
计算结果为:SO2浓度与NO2浓度的皮尔逊相关系数为0.41。

因为PM10浓度的变化与空气含水量密切相关,比如当相对湿度大于60%以上时, PM10颗粒会吸湿增大,所以该部分试图分析PM10浓度与空气湿度的关系。主要步骤参考上一部分:
(1)使用sql语句,从res.csv中选择PM10浓度和空气湿度的逐小时浓度数据;
(2)保存其数据至csv文件,供后续可视化使用;
(3)使用describe函数,计算其数量、均值、标准差以及最值;
(4)将选择的数据映射为Double类型;
(5)使用Statistics.corr(PM10, Air, "pearson")函数,计算其相关系数。
计算结果为:PM10浓度与空气湿度的皮尔逊相关系数为 -0.41。

因为云量及其云内液体的吸收对O3含量有一定影响,所以该部分试图对O3浓度与云量的关系进行分析,主要使用斯皮尔曼相关系数。主要步骤参考上一部分:
(1)使用sql语句,从res.csv中选择O3浓度和云量的逐小时浓度数据;
(2)保存其数据至csv文件,供后续可视化使用;
(3)使用describe函数,计算其数量、均值、标准差以及最值;
(4)将选择的数据映射为Double类型;
(5)使用Statistics.corr(O3, Cloud, "spearman")函数,计算其相关系数。
计算结果为:O3浓度与云量的斯皮尔曼相关系数为 0.04,关联性不高。

部分代码

对于代码我分成了几个部分,有部分代码和之前只有细微差别

Task4函数的代码(主要部分)
 def Task4(df: DataFrame): Unit = {df.createOrReplaceTempView("SO2_NO2")val SO2_NO2_1 = spark.sql("select `监测时间`,`SO2监测浓度(μg/m³)`,`NO2监测浓度(μg/m³)` from SO2_NO2 where `SO2监测浓度(μg/m³)` > 0 ")SO2_NO2_1.write.option("header",true).mode("overwrite").csv("file:/root/work/Task4/SO2_NO2.csv")SO2_NO2_1.describe().showval SO2_NO2_2= spark.sql("select`SO2监测浓度(μg/m³)`,`NO2监测浓度(μg/m³)` from SO2_NO2 where `SO2监测浓度(μg/m³)` > 0 ")val rdd_real1 = SO2_NO2_2.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))val SO2 = rdd_real1.map(x=>x._1.toDouble )val NO2 = rdd_real1.map(x=>x._2.toDouble )val SO2withNO2cor_pearson:Double = Statistics.corr(SO2, NO2, "pearson")df.createOrReplaceTempView("PM25_Humidity")val PM25_Humidity_1 = spark.sql("select `监测时间`,`PM2.5监测浓度(μg/m³)`,`湿度(%)` from PM25_Humidity where `PM2.5监测浓度(μg/m³)` > 0 ")PM25_Humidity_1.write.option("header",true).mode("overwrite").csv("file:/root/work/Task4/PM25_Humidity.csv")PM25_Humidity_1.describe().showval PM25_Humidity_2= spark.sql("select `PM2.5监测浓度(μg/m³)`,`湿度(%)` from PM25_Humidity where `PM2.5监测浓度(μg/m³)` > 0")val rdd_real2 = PM25_Humidity_2.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))val PM25 = rdd_real2.map(x=>x._1.toDouble )val Humidity = rdd_real2.map(x=>x._2.toDouble )val PM25_Humiditycor_pearson:Double = Statistics.corr(PM25, Humidity, "pearson")df.createOrReplaceTempView("PM10_Humidity")val PM10_Humidity_1 = spark.sql("select `监测时间`,`PM10监测浓度(μg/m³)`,`湿度(%)` from PM10_Humidity where `PM10监测浓度(μg/m³)` > 0 ")PM10_Humidity_1.write.option("header",true).mode("overwrite").csv("file:/root/work/Task4/PM10_Humidity.csv")PM10_Humidity_1.describe().showval PM10_Humidity_2= spark.sql("select `PM10监测浓度(μg/m³)`,`湿度(%)` from PM10_Humidity where `PM10监测浓度(μg/m³)` > 0")val rdd_real3 = PM10_Humidity_2.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))val PM10 = rdd_real3.map(x=>x._1.toDouble )val Humidity1 = rdd_real3.map(x=>x._2.toDouble )val PM10_Humiditycor_pearson:Double = Statistics.corr(PM10, Humidity1, "pearson")df.createOrReplaceTempView("O3_Cloud")val O3_Cloud_1 = spark.sql("select `监测时间`,`O3监测浓度(μg/m³)`,`云量` from O3_Cloud where `O3监测浓度(μg/m³)` > 0 ")O3_Cloud_1.write.option("header",true).mode("overwrite").csv("file:/root/work/Task4/O3_Cloud.csv")O3_Cloud_1.describe().showval O3_Cloud_2= spark.sql("select `O3监测浓度(μg/m³)`,`云量` from O3_Cloud where `O3监测浓度(μg/m³)` > 0 ")val rdd_real4 = O3_Cloud_2.rdd.map(x=>(x(0).toString.toDouble ,x(1).toString.toDouble ))val O3 = rdd_real4.map(x=>x._1.toDouble )val Cloud = rdd_real4.map(x=>x._2.toDouble )val O3withCloud_cor_pearson:Double = Statistics.corr(O3, Cloud, "pearson")println("SO2 with NO2 Pearson:",SO2withNO2cor_pearson)println("PM25 with Humidity Pearson:",PM25_Humiditycor_pearson)println("PM10 with Humidity Pearson:",PM10_Humiditycor_pearson)println("O3 with Cloud Pearson:",O3withCloud_cor_pearson)//    val SO2_2 = SO2_1.orderBy("监测时间")
//    SO2_2.write.option("header", true).mode("overwrite").csv("file:///root/work/Task1/SO2_20.csv")
//    //    SO2_1.show()
//    println("筛选六大污染物浓度排名前20的时段:")
//    SO2_2.show()}
 主函数代码
  def main(args: Array[String]): Unit = {//    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
//    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)Logger.getLogger("org").setLevel(Level.ERROR)println("Test Begin")
//    println(SparkSession.getClass)val df = spark.read.schema(schema).option("header", "true").csv("file:///root/res.csv")
//    df.show()
//    Task1(df)
//    Task2(df)val df_data2 = spark.read.schema(schema_data2).option("header", "true").csv("file:///root/data2.csv")
//    df_data2.show()
//    Task3(df_data2)Task4(df)}
运行spark
[root@master ~]# ./spark-2.4.0-bin-hadoop2.7/sbin/start-all.sh
导包

对于包依赖的安装我会过几天更新,具体是步骤0.1,如果有人看到了这里但是我忘了更新可以提醒我!

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.apache.log4j.{Level,Logger}
import org.apache.spark.mllib.stat.Statisticsimport scala.collection.mutable.ArrayBuffer
一些Spark信息的和schema的导入
  val schema = StructType(Array(StructField("", FloatType),StructField("监测时间", StringType),StructField("SO2监测浓度(μg/m³)", FloatType),StructField("NO2监测浓度(μg/m³)", FloatType),StructField("PM10监测浓度(μg/m³)", FloatType),StructField("PM2.5监测浓度(μg/m³)", FloatType),StructField("O3监测浓度(μg/m³)", FloatType),StructField("CO监测浓度(mg/m³)", FloatType),StructField("温度(℃)", FloatType),StructField("湿度(%)", FloatType),StructField("气压(MBar)", FloatType),StructField("风速(m/s)", FloatType),StructField("风向(°)", FloatType),StructField("云量", FloatType),StructField("长波辐射(W/m²)", FloatType)))val schema_data2 = StructType(Array(StructField("监测日期", StringType),StructField("SO2监测浓度(μg/m3)", FloatType),StructField("NO2监测浓度(μg/m3)", FloatType),StructField("PM10监测浓度(μg/m3)", FloatType),StructField("PM2.5监测浓度(μg/m3)", FloatType),StructField("O3最大八小时滑动平均监测浓度(μg/m3)", FloatType),StructField("CO监测浓度(mg/m3)", FloatType)))val spark = SparkSession.builder().master("spark://192.168.244.130:7077").getOrCreate()
如果spark链接报错

如果链接spark的时候会失败,可以使用下面的代码替换之前的(但是这个只能是测试的时候用,是假的Spark,具体报错的debug过几天我也会做,忘了有读者需要可以提醒我)

  val spark = SparkSession.builder().master("local[2]").getOrCreate()
运行结果
Test Begin
+-------+--------------+------------------+------------------+
|summary|      监测时间|SO2监测浓度(μg/m³)|NO2监测浓度(μg/m³)|
+-------+--------------+------------------+------------------+
|  count|         18932|             18932|             18932|
|   mean|          null| 7.038664694696809| 32.96862455102472|
| stddev|          null|3.6722874731107695|24.059174407491813|
|    min|2019-10-1 0:00|               1.0|               2.0|
|    max| 2021-7-9 9:00|              47.0|             211.0|
+-------+--------------+------------------+------------------+23/09/20 05:52:11 WARN BLAS: Failed to load implementation from: com.github.fommillib.NativeSystemBLAS
23/09/20 05:52:11 WARN BLAS: Failed to load implementation from: com.github.fommillib.NativeRefBLAS
+-------+--------------+--------------------+------------------+
|summary|      监测时间|PM2.5监测浓度(μg/m³)|           湿度(%)|
+-------+--------------+--------------------+------------------+
|  count|         18856|               18856|             18856|
|   mean|          null|  24.079974543911753| 69.09917267713195|
| stddev|          null|   18.29486149045867|16.135189520717272|
|    min|2019-10-1 0:00|                 1.0|              14.0|
|    max| 2021-7-9 9:00|               163.0|              99.0|
+-------+--------------+--------------------+------------------++-------+--------------+-------------------+------------------+
|summary|      监测时间|PM10监测浓度(μg/m³)|           湿度(%)|
+-------+--------------+-------------------+------------------+
|  count|         19140|              19140|             19140|
|   mean|          null|  44.20491118077325| 69.26734587251829|
| stddev|          null| 27.377230327250064|16.126409365072348|
|    min|2019-10-1 0:00|                1.0|              14.0|
|    max| 2021-7-9 9:00|              217.0|              99.0|
+-------+--------------+-------------------+------------------++-------+--------------+------------------+------------------+
|summary|      监测时间| O3监测浓度(μg/m³)|              云量|
+-------+--------------+------------------+------------------+
|  count|         17965|             17965|             17965|
|   mean|          null|57.839020317283605|0.4672175035878058|
| stddev|          null| 50.68049641548892|0.3282385023346934|
|    min|2019-10-1 0:00|               1.0|               0.0|
|    max| 2021-7-9 9:00|             405.0|               1.0|
+-------+--------------+------------------+------------------+(SO2 with NO2 Pearson:,0.4104482705759166)
(PM25 with Humidity Pearson:,-0.29398298555150565)
(PM10 with Humidity Pearson:,-0.41704389566755495)
(O3 with Cloud Pearson:,0.042812164393700905)Process finished with exit code 0

更多推荐

(保姆级)Spark气象监测数据分析

本文发布于:2024-03-10 09:42:40,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1727623.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:保姆   气象   数据   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!