范式(面向对象,面向过程,面向函数)"/>
三范式(面向对象,面向过程,面向函数)
package Batch2import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext}
import org.apache.spark.sql.functions._
//三范式(面向对象,面向过程,面向函数)
object JSONAnalysis2 {def main(args: Array[String]): Unit = {//confval conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)//sparkcontextval sc = new SparkContext(conf)//sqlContextval sql = new SQLContext(sc)import sql.implicits._val dataSet1: Dataset[String] =Seq("""|{| "devices": {| "thermostats": {| "peyiJNo0IldT2YlIVtYaGQ": {| "device_id": "peyiJNo0IldT2YlIVtYaGQ",| "locale": "en-US",| "software_version": "4.0",| "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",| "where_name": "Hallway Upstairs",| "last_connection": "2016-10-31T23:59:59.000Z",| "is_online": true,| "can_cool": true,| "can_heat": true,| "is_using_emergency_heat": true,| "has_fan": true,| "fan_timer_active": true,| "fan_timer_timeout": "2016-10-31T23:59:59.000Z",| "temperature_scale": "F",| "target_temperature_f": 72,| "target_temperature_high_f": 80,| "target_temperature_low_f": 65,| "eco_temperature_high_f": 80,| "eco_temperature_low_f": 65,| "away_temperature_high_f": 80,| "away_temperature_low_f": 65,| "hvac_mode": "heat",| "humidity": 40,| "hvac_state": "heating",| "is_locked": true,| "locked_temp_min_f": 65,| "locked_temp_max_f": 80| }| },| "smoke_co_alarms": {| "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {| "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",| "locale": "en-US",| "software_version": "1.01",| "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",| "where_name": "Jane's Room",| "last_connection": "2016-10-31T23:59:59.000Z",| "is_online": true,| "battery_health": "ok",| "co_alarm_state": "ok",| "smoke_alarm_state": "ok",| "is_manual_test_active": true,| "last_manual_test_time": "2016-10-31T23:59:59.000Z",| "ui_color_state": "gray"| }| },| "cameras": {| "awJo6rH0IldT2YlIVtYaGQ": {| "device_id": "awJo6rH",| "software_version": "4.0",| "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",| "where_name": "Foyer",| "is_online": true,| "is_streaming": true,| "is_audio_input_enabled": true,| "last_is_online_change": "2016-12-29T18:42:00.000Z",| "is_video_history_enabled": true,| "web_url": "=access_token",| "app_url": "nestmobile://cameras/device_id?auth=access_token",| "is_public_share_enabled": true,| "activity_zones": { "name": "Walkway", "id": 244083 },| "last_event": "2016-10-31T23:59:59.000Z"| }| }| }| }""".stripMargin).toDS()//定义schema信息val schema = new StructType().add("devices",new StructType().add("thermostats",MapType(StringType,new StructType().add("device_id",StringType).add("locale",StringType).add("software_version",StringType).add("structure_id",StringType).add("where_name",StringType).add("last_connection",StringType).add("is_online",BooleanType).add("can_cool",BooleanType).add("can_heat",BooleanType).add("is_using_emergency_heat",BooleanType).add("has_fan",BooleanType).add("fan_timer_active",BooleanType).add("fan_timer_timeout",StringType).add("temperature_scale",StringType).add("target_temperature_f",LongType).add("target_temperature_high_f",LongType).add("target_temperature_low_f",LongType).add("eco_temperature_high_f",LongType).add("eco_temperature_low_f",LongType).add("away_temperature_high_f",LongType).add("away_temperature_low_f",LongType).add("hvac_mode",StringType).add("humidity",LongType).add("hvac_state",StringType).add("is_locked",BooleanType).add("locked_temp_min_f",LongType).add("locked_temp_max_f",LongType))).add("smoke_co_alarms",MapType(StringType,new StructType().add("device_id",StringType).add("locale",StringType).add("software_version",StringType).add("structure_id",StringType).add("where_name",StringType).add("last_connection",StringType).add("is_online",BooleanType).add("battery_health",StringType).add("co_alarm_state",StringType).add("smoke_alarm_state",StringType).add("is_manual_test_active",BooleanType).add("last_manual_test_time",StringType).add("ui_color_state",StringType))).add("cameras",MapType(StringType,new StructType().add("device_id",StringType).add("software_version",StringType).add("structure_id",StringType).add("where_name",StringType).add("is_online",BooleanType).add("is_streaming",BooleanType).add("is_audio_input_enabled",BooleanType).add("last_is_online_change",StringType).add("is_video_history_enabled",BooleanType).add("web_url",StringType).add("app_url",StringType).add("is_public_share_enabled",BooleanType).add("activity_zones",new StructType().add("name",StringType).add("id",StringType)).add("last_event",StringType))))//dataframeval dataDF: DataFrame = sql.read.schema(schema).json(dataSet1)//读取数据//thermostatsval thermDF = dataDF.select($"devices".getItem("thermostats").alias("therm"))//smoke_co_alarmsval smokeDF = dataDF.select($"devices".getItem("smoke_co_alarms").alias("smoke"))//camerasval camerDF = dataDF.select($"devices".getItem("cameras").alias("camer"))//炸裂cameras 和thermostats 和smoke_co_alarmsval thermEX = thermDF.select(explode($"therm"))val smokeEX = smokeDF.select(explode($"smoke"))val camerEX = camerDF.select(explode($"camer"))//将里面的所有字段都拿出来//thermEXval thermDFF = thermEX.select($"value".getItem("device_id"),$"value".getItem("locale"),$"value".getItem("software_version").alias("version"),$"value".getItem("structure_id"),$"value".getItem("where_name"),$"value".getItem("last_connection"),$"value".getItem("is_online"),$"value".getItem("can_cool"),$"value".getItem("can_heat"),$"value".getItem("is_using_emergency_heat"),$"value".getItem("has_fan"),$"value".getItem("fan_timer_active"),$"value".getItem("fan_timer_timeout"),$"value".getItem("temperature_scale"),$"value".getItem("target_temperature_f"),$"value".getItem("target_temperature_high_f"),$"value".getItem("target_temperature_low_f"),$"value".getItem("eco_temperature_high_f"),$"value".getItem("eco_temperature_low_f"),$"value".getItem("away_temperature_high_f"),$"value".getItem("away_temperature_low_f"),$"value".getItem("humidity"),$"value".getItem("hvac_state"),$"value".getItem("is_locked"),$"value".getItem("locked_temp_min_f"),$"value".getItem("locked_temp_max_f"))//smokeEXval smokeDFF = smokeEX.select($"value".getItem("device_id"),$"value".getItem("locale"),$"value".getItem("software_version"),$"value".getItem("structure_id"),$"value".getItem("where_name"),$"value".getItem("last_connection"),$"value".getItem("is_online"),$"value".getItem("battery_health"),$"value".getItem("co_alarm_state"),$"value".getItem("smoke_alarm_state"),$"value".getItem("is_manual_test_active"),$"value".getItem("last_manual_test_time"),$"value".getItem("ui_color_state"))//camerEXval camerDFF: DataFrame = camerEX.select($"value".getItem("device_id"),$"value".getItem("software_version").alias("version"),$"value".getItem("structure_id"),$"value".getItem("where_name"),$"value".getItem("is_online"),$"value".getItem("is_streaming"),$"value".getItem("is_audio_input_enabled"),$"value".getItem("last_is_online_change"),$"value".getItem("is_video_history_enabled"),$"value".getItem("web_url"),$"value".getItem("app_url"),$"value".getItem("is_public_share_enabled"),$"value".getItem("activity_zones").getItem("name"),$"value".getItem("activity_zones").getItem("id"),$"value".getItem("last_event"))cameras与thermostats用software_version内连接thermDFF.join(camerDFF,"version").show(false)//释放资源sc.stop()}
}
更多推荐
三范式(面向对象,面向过程,面向函数)
发布评论