SparkSql处理嵌套json数据

编程入门 行业动态 更新时间:2024-10-28 05:13:30

SparkSql处理<a href=https://www.elefans.com/category/jswz/34/1771299.html style=嵌套json数据"/>

SparkSql处理嵌套json数据

一、数据准备:

{
"dc_id": "dc-101",
"source": {"sensor-igauge": {"id": 10,"ip": "68.28.91.22","description": "Sensor attached to the container ceilings","temp":35,"c02_level": 1475,"geo": {"lat":38.00, "long":97.00}                        },"sensor-ipad": {"id": 13,"ip": "67.185.72.1","description": "Sensor ipad attached to carbon cylinders","temp": 34,"c02_level": 1370,"geo": {"lat":47.41, "long":-122.00}},"sensor-inest": {"id": 8,"ip": "208.109.163.218","description": "Sensor attached to the factory ceilings","temp": 40,"c02_level": 1346,"geo": {"lat":33.61, "long":-111.89}},"sensor-istick": {"id": 5,"ip": "204.116.105.67","description": "Sensor embedded in exhaust pipes in the ceilings","temp": 40,"c02_level": 1574,"geo": {"lat":35.93, "long":-85.46}}}
}

代码示例:

package spark.project_1import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, sql}/*** Author Mr. Guo* Create 2018/10/19 - 14:36*/
case class DeviceAlert(dcId: String, deviceType: String, ip: String, deviceId: Long, temp: Long, c02_level: Long,lat: Double, lon: Double)object dispose_json {def main(args: Array[String]): Unit = {val conf = new SparkConf()val ssc = new sql.SparkSession.Builder().config(conf).master("local[2]").appName("dispose_json").getOrCreate()ssc.sparkContext.setLogLevel("error")println("--------------------------------------------------------------------")//导入隐式转换import ssc.implicits._val dataDS1 = Seq("""|{|"dc_id": "dc-101",|"source": {|    "sensor-igauge": {|      "id": 10,|      "ip": "68.28.91.22",|      "description": "Sensor attached to the container ceilings",|      "temp":35,|      "c02_level": 1475,|      "geo": {"lat":38.00, "long":97.00}|    },|    "sensor-ipad": {|      "id": 13,|      "ip": "67.185.72.1",|      "description": "Sensor ipad attached to carbon cylinders",|      "temp": 34,|      "c02_level": 1370,|      "geo": {"lat":47.41, "long":-122.00}|    },|    "sensor-inest": {|      "id": 8,|      "ip": "208.109.163.218",|      "description": "Sensor attached to the factory ceilings",|      "temp": 40,|      "c02_level": 1346,|      "geo": {"lat":33.61, "long":-111.89}|    },|    "sensor-istick": {|      "id": 5,|      "ip": "204.116.105.67",|      "description": "Sensor embedded in exhaust pipes in the ceilings",|      "temp": 40,|      "c02_level": 1574,|      "geo": {"lat":35.93, "long":-85.46}|    }|  }|}""".stripMargin).toDS()//定义schemaval schema1 = new StructType().add("dc_id", StringType).add("source",MapType(StringType,new StructType().add("description", StringType).add("ip", StringType).add("id", LongType).add("temp", LongType).add("c02_level", LongType).add("geo",new StructType().add("lat", DoubleType).add("long", DoubleType))))
val df1 = ssc.read.schema(schema1).json(dataDS1.rdd)df1.printSchema()df1.show(false)println("=======================================")val explodeDF = df1.select($"dc_id", explode($"source"))explodeDF.printSchema()explodeDF.show(10, false)println("=======================================")val notifydevicesDS = explodeDF.select($"dc_id" as "dcId",$"key" as "deviceType",'value.getItem("ip") as 'ip,'value.getItem("id") as 'deviceId,'value.getItem("c02_level") as 'c02_level,'value.getItem("temp") as 'temp,'value.getItem("geo").getItem("lat") as 'lat,'value.getItem("geo").getItem("long") as 'lon).as[DeviceAlert]notifydevicesDS.printSchema()notifydevicesDS.show(20, false)ssc.stop()

二、数据准备

{"devices": {"thermostats": {"peyiJNo0IldT2YlIVtYaGQ": {"device_id": "peyiJNo0IldT2YlIVtYaGQ","locale": "en-US","software_version": "4.0","structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name": "Hallway Upstairs","last_connection": "2016-10-31T23:59:59.000Z","is_online": true,"can_cool": true,"can_heat": true,"is_using_emergency_heat": true,"has_fan": true,"fan_timer_active": true,"fan_timer_timeout": "2016-10-31T23:59:59.000Z","temperature_scale": "F","target_temperature_f": 72,"target_temperature_high_f": 80,"target_temperature_low_f": 65,"eco_temperature_high_f": 80,"eco_temperature_low_f": 65,"away_temperature_high_f": 80,"away_temperature_low_f": 65,"hvac_mode": "heat","humidity": 40,"hvac_state": "heating","is_locked": true,"locked_temp_min_f": 65,"locked_temp_max_f": 80}},"smoke_co_alarms": {"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {"device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs","locale": "en-US","software_version": "1.01","structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name": "Jane's Room","last_connection": "2016-10-31T23:59:59.000Z","is_online": true,"battery_health": "ok","co_alarm_state": "ok","smoke_alarm_state": "ok","is_manual_test_active": true,"last_manual_test_time": "2016-10-31T23:59:59.000Z","ui_color_state": "gray"}},"cameras": {"awJo6rH0IldT2YlIVtYaGQ": {"device_id": "awJo6rH","software_version": "4.0","structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name": "Foyer","is_online": true,"is_streaming": true,"is_audio_input_enabled": true,"last_is_online_change": "2016-12-29T18:42:00.000Z","is_video_history_enabled": true,"web_url": "=access_token","app_url": "nestmobile://cameras/device_id?auth=access_token","is_public_share_enabled": true,"activity_zones": { "name": "Walkway", "id": 244083 },"last_event": "2016-10-31T23:59:59.000Z"}}}}

代码示例:

package spark.project_1import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, sql}/*** Author Mr. Guo* Create 2018/10/19 - 14:36*/
case class DeviceAlert(dcId: String, deviceType: String, ip: String, deviceId: Long, temp: Long, c02_level: Long,lat: Double, lon: Double)object dispose_json {def main(args: Array[String]): Unit = {val conf = new SparkConf()val ssc = new sql.SparkSession.Builder().config(conf).master("local[2]").appName("dispose_json").getOrCreate()ssc.sparkContext.setLogLevel("error")println("--------------------------------------------------------------------")//导入隐式转换import ssc.implicits._val dataDS2 = Seq("""|{|  "devices": {|     "thermostats": {|        "peyiJNo0IldT2YlIVtYaGQ": {|          "device_id": "peyiJNo0IldT2YlIVtYaGQ",|          "locale": "en-US",|          "software_version": "4.0",|          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",|          "where_name": "Hallway Upstairs",|          "last_connection": "2016-10-31T23:59:59.000Z",|          "is_online": true,|          "can_cool": true,|          "can_heat": true,|          "is_using_emergency_heat": true,|          "has_fan": true,|          "fan_timer_active": true,|          "fan_timer_timeout": "2016-10-31T23:59:59.000Z",|          "temperature_scale": "F",|          "target_temperature_f": 72,|          "target_temperature_high_f": 80,|          "target_temperature_low_f": 65,|          "eco_temperature_high_f": 80,|          "eco_temperature_low_f": 65,|          "away_temperature_high_f": 80,|          "away_temperature_low_f": 65,|          "hvac_mode": "heat",|          "humidity": 40,|          "hvac_state": "heating",|          "is_locked": true,|          "locked_temp_min_f": 65,|          "locked_temp_max_f": 80|          }|        },|        "smoke_co_alarms": {|          "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {|            "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",|            "locale": "en-US",|            "software_version": "1.01",|            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",|            "where_name": "Jane's Room",|            "last_connection": "2016-10-31T23:59:59.000Z",|            "is_online": true,|            "battery_health": "ok",|            "co_alarm_state": "ok",|            "smoke_alarm_state": "ok",|            "is_manual_test_active": true,|            "last_manual_test_time": "2016-10-31T23:59:59.000Z",|            "ui_color_state": "gray"|            }|          },|       "cameras": {|        "awJo6rH0IldT2YlIVtYaGQ": {|          "device_id": "awJo6rH",|          "software_version": "4.0",|          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",|          "where_name": "Foyer",|          "is_online": true,|          "is_streaming": true,|          "is_audio_input_enabled": true,|          "last_is_online_change": "2016-12-29T18:42:00.000Z",|          "is_video_history_enabled": true,|          "web_url": "=access_token",|          "app_url": "nestmobile://cameras/device_id?auth=access_token",|          "is_public_share_enabled": true,|          "activity_zones": { "name": "Walkway", "id": 244083 },|          "last_event": "2016-10-31T23:59:59.000Z"|          }|        }|      }|     }""".stripMargin).toDS()val schmea2 = new StructType().add("devices",new StructType().add("thermostats", MapType(StringType,new StructType().add("device_id", StringType).add("locale", StringType).add("software_version", StringType).add("structure_id", StringType).add("where_name", StringType).add("last_connection", StringType).add("is_online", BooleanType).add("can_cool", BooleanType).add("can_heat", BooleanType).add("is_using_emergency_heat", BooleanType).add("has_fan", BooleanType).add("fan_timer_active", BooleanType).add("fan_timer_timeout", StringType).add("temperature_scale", StringType).add("target_temperature_f", DoubleType).add("target_temperature_high_f", DoubleType).add("target_temperature_low_f", DoubleType).add("eco_temperature_high_f", DoubleType).add("eco_temperature_low_f", DoubleType).add("away_temperature_high_f", DoubleType).add("away_temperature_low_f", DoubleType).add("hvac_mode", StringType).add("humidity", DoubleType).add("hvac_state", StringType).add("is_locked", BooleanType).add("locked_temp_min_f", DoubleType).add("locked_temp_max_f", DoubleType))).add("smoke_co_alarms", MapType(StringType,new StructType().add("device_id", StringType).add("locale", StringType).add("software_version", StringType).add("structure_id", StringType).add("where_name", StringType).add("last_connection", StringType).add("is_online", BooleanType).add("battery_health", StringType).add("co_alarm_state", StringType).add("smoke_alarm_state", StringType).add("is_manual_test_active", BooleanType).add("last_manual_test_time", StringType).add("ui_color_state", StringType))).add("cameras", MapType(StringType,new StructType().add("device_id", StringType).add("software_version", StringType).add("structure_id", StringType).add("where_name", StringType).add("is_online", BooleanType).add("is_streaming", BooleanType).add("is_audio_input_enabled", BooleanType).add("last_is_online_change", StringType).add("is_video_history_enabled", BooleanType).add("web_url", StringType).add("app_url", StringType).add("is_public_share_enabled", BooleanType).add("activity_zones",new StructType().add("name", StringType).add("id", LongType)).add("last_event", StringType))))val df2 = ssc.read.schema(schmea2).json(dataDS2.rdd)val stringJsonDF = df2.select(to_json(struct($"*"))).toDF("nestDevice")val mapColumnsDF = df2.select($"devices".getItem("smoke_co_alarms").alias("smoke_alarms"),$"devices".getItem("cameras").alias("cameras"),$"devices".getItem("thermostats").alias("thermostats"))val explodeThermostatsDF = mapColumnsDF.select(explode($"thermostats"))val explodeCamerasDF = mapColumnsDF.select(explode($"cameras"))val explodeSmokedAlarmsDF = df2.select(explode($"devices.smoke_co_alarms"))explodeSmokedAlarmsDF.printSchema()val thermostateDF =explodeThermostatsDF.select($"value".getItem("device_id").alias("device_id"),$"value".getItem("locale").alias("locale"),$"value".getItem("where_name").alias("location"),$"value".getItem("last_connection").alias("last_connection"),$"value".getItem("humidity").alias("humidity"),$"value".getItem("target_temperature_f").alias("target_temperature_f"),$"value".getItem("hvac_mode").alias("mode"),$"value".getItem("software_version").alias("version"))val cameraDF =explodeCamerasDF.select($"value".getItem("device_id").alias("device_id"),$"value".getItem("where_name").alias("location"),$"value".getItem("software_version").alias("version"),$"value".getItem("activity_zones").getItem("name").alias("name"),$"value".getItem("activity_zones").getItem("id").alias("id"))val smokedAlarmsDF =explodeSmokedAlarmsDF.select($"value".getItem("device_id").alias("device_id"),$"value".getItem("where_name").alias("location"),$"value".getItem("software_version").alias("version"),$"value".getItem("last_connection").alias("last_connected"),$"value".getItem("battery_health").alias("battery_health"))cameraDF.showval joineDFs = thermostateDF.join(cameraDF,"version")joineDFs.show(10,false)ssc.stop()

  

转载于:.html

更多推荐

SparkSql处理嵌套json数据

本文发布于:2024-02-26 20:19:57,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1703765.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:嵌套   数据   SparkSql   json

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!