BigData项目

编程入门 行业动态 更新时间:2024-10-17 21:23:31

BigData<a href=https://www.elefans.com/category/jswz/34/1771421.html style=项目"/>

BigData项目

1. 生成GEOHASH字典

1.1 先将全国的经纬度整理成省市区经纬度的格式

CREATE TABLE area_dict
AS
SELECTsheng.AREANAME provience,shi.AREANAME city,qu.AREANAME region,qu.BD09_LNG BD09_LNG,qu.BD09_LAT BD09_LAT
FROM
t_md_areas qu
JOIN
t_md_areas shi ON qu.PARENTID = shi.ID
JOIN
t_md_areas sheng ON shi.PARENTID = sheng.ID

1.2 将表的经纬度转换成GEOHASH

import java.util.Propertiesimport ch.hsr.geohash.GeoHash
import org.apache.spark.sql.SparkSessionobject GeoHashDictGen {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("geohash字典生成").master("local[*]").getOrCreate()val properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password", "root")val frame = spark.read.jdbc("jdbc:mysql://linux01:3306/realtimedw", "area_dict", properties)frame.createTempView("df")val gps2geohash = (lat: Double, lng: Double) => {//将gps转换成geohashGeoHash.geoHashStringWithCharacterPrecision(lat, lng, 5)}spark.udf.register("gps2geohash", gps2geohash)val res = spark.sql("""||select|  provience,|  city,|  region,|  gps2geohash(BD09_LAT,BD09_LNG) as geohash|from|df||""".stripMargin)res.write.parquet("hdfs://linux01:8020/dicts/geodict")spark.close()}
}

2 设备账号关联得分计算

1.1 测试

import org.apache.spark.sql.SparkSessionobject DeviceAccountRelationScoreTest {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("设备账号关联得分计算").master("local[*]").getOrCreate()val idbind = spark.read.option("header", "true").option("nullValue","\\N") //把数据中的\N 自动换成null.csv("D:\\ideaProjects\\sp_boot1\\bigdata_project\\dw_etl\\testdata\\idbind\\2021-01-11\\idbind.csv")val log = spark.read.option("header", "true").csv("D:\\ideaProjects\\sp_boot1\\bigdata_project\\dw_etl\\testdata\\logdata\\2021-01-12\\log.csv")idbind.createTempView("rs")log.createTempView("t_log")//将两个数据join到一块val joined = spark.sql("""||select||  t1.deviceid rs_deviceid,|  if(trim(t1.account)="",null,t1.account) rs_account,|  cast(t1.score as double) rs_score,|  t1.first_time first_time,|  t1.last_time last_time,|  t2.deviceid log_deviceid,|  if(trim(t2.account)="''",null,t2.account) log_account,|  cast(t2.score as double) log_score,|  t2.time time||from|rs t1 full join t_log t2 on t1.deviceid=t2.deviceid and t1.account = t2.account|||""".stripMargin)/*** +-----------+----------+--------+----------+---------+------------+-----------+---------+----+* |rs_deviceid|rs_account|rs_score|first_time|last_time|log_deviceid|log_account|log_score|time|* +-----------+----------+--------+----------+---------+------------+-----------+---------+----+* |         d1|        c1|   200.0|         1|       10|          d1|         c1|    200.0|  11|* |       null|      null|    null|      null|     null|          d1|         c5|    100.0|  11|* |       null|      null|    null|      null|     null|          d6|         c6|    200.0|  11|* |         d3|      null|    null|      null|     null|        null|       null|     null|null|* |       null|      null|    null|      null|     null|          d5|         c5|    200.0|  11|* |         d2|        c2|   800.0|         2|        8|        null|       null|     null|null|* |         d2|        c3|   600.0|         3|        7|          d2|         c3|    100.0|  11|* |         d4|        c4|   200.0|         1|       10|        null|       null|     null|null|* |         d5|      null|    null|      null|     null|        null|       null|     null|null|* |       null|      null|    null|      null|     null|          d7|       null|     null|null|* +-----------+----------+--------+----------+---------+------------+-----------+---------+----+**/joined.createTempView("t_join")val res1 = spark.sql("""||select|   nvl(rs_deviceid,log_deviceid) deviceid,|   nvl(rs_account,log_account) account,|   if(rs_deviceid is not null and log_deviceid is null,rs_score*0.7,nvl(rs_score,0)+nvl(log_score,0)) score,|   nvl(first_time,time) first_time,|   nvl(time,last_time) last_time|from|t_join||""".stripMargin)//去除无用信息/*** +--------+-------+-----+----------+---------+* |deviceid|account|score|first_time|last_time|* +--------+-------+-----+----------+---------+* |      d1|     c1|400.0|         1|       11|* |      d1|     c5|100.0|        11|       11|* |      d6|     c6|200.0|        11|       11|* |      d3|   null| null|      null|     null|* |      d5|     c5|200.0|        11|       11|* |      d2|     c2|560.0|         2|        8|* |      d2|     c3|700.0|         3|       11|* |      d4|     c4|140.0|         1|       10|* |      d5|   null| null|      null|     null|* |      d7|   null|  0.0|      null|     null|* +--------+-------+-----+----------+---------+*/res1.where("account is null").createTempView("t_null")res1.where("account is not null").createTempView("t_you")spark.sql("""||select|  nvl(you.deviceid,wu.deviceid) deviceid,|  you.account account,|  you.score score,|  you.first_time first_time,|  you.last_time last_time|from|t_you you full join t_null wu on you.deviceid = wu.deviceid|||""".stripMargin).show()/*** +--------+-------+-----+----------+---------+* |deviceid|account|score|first_time|last_time|* +--------+-------+-----+----------+---------+* |      d2|     c2|560.0|         2|        8|* |      d2|     c3|700.0|         3|       11|* |      d3|   null| null|      null|     null|* |      d1|     c1|400.0|         1|       11|* |      d1|     c5|100.0|        11|       11|* |      d4|     c4|140.0|         1|       10|* |      d6|     c6|200.0|        11|       11|* |      d7|   null| null|      null|     null|* |      d5|     c5|200.0|        11|       11|* +--------+-------+-----+----------+---------+*/spark.close()}}

1.2 真实数据设备&账号关联得分计算

object DeviceAccountRelationScore {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.WARN)val spark = SparkSession.builder()//.master("local[*]").config("spark.sql.shuffle.partitions","10") //spark任务的并行度.appName("设备账号关联得分计算").enableHiveSupport().getOrCreate()// 加载T-1日的设备账号关联评分表val relation = spark.read.table("dwd.device_account_relation").where("dt='2021-01-09'")// 加载T日的日志表val log = spark.read.table("ods.event_app_log").where("dt='2021-01-10'")// relation.show(10,false)// log.show(10,false)/*** 先聚合计算T日的日志中的设备&账号评分*/log.createTempView("log_detail")val logAggr = spark.sql("""|select|deviceid,|if(trim(account)='',null,account) as account,|cast(count(distinct sessionid)*100 as double) as score,|min(timestamp)  as time||from log_detail|group by deviceid,account|||""".stripMargin)relation.createTempView("re")logAggr.createTempView("log")val joined = spark.sql("""|||select|re.deviceid as re_deviceid,|re.account as re_account,|cast(re.score as double) as re_score,|re.first_time as re_firsttime,|re.last_time as re_lasttime,|log.deviceid as log_deviceid,|log.account as log_account,|cast(log.score as double) as log_score,|log.time as time||from re full join log|on re.deviceid=log.deviceid and re.account=log.account|||""".stripMargin)joined.createTempView("joined")val res = spark.sql("""||select|  nvl(re_deviceid,log_deviceid) as deviceid,|  nvl(re_account,log_account) as account,|  if(re_deviceid is not null and log_deviceid is null,re_score*0.7,nvl(re_score,0)+log_score)  score,|  nvl(re_firsttime,time)  first_time,|  nvl(time,re_lasttime) as last_time||from joined||""".stripMargin)/*** 从这个结果中,去掉那些已经有了关联账号的空设备*/res.where("account is null").createTempView("t_null")res.where("account is not null").createTempView("t_you")spark.sql("""||insert into table dwd.device_account_relation partition(dt='2021-01-10')||select|  nvl(t_you.deviceid,t_null.deviceid) as deviceid,|  t_you.account as account,|  t_you.score as score,|  t_you.first_time as first_time,|  t_you.last_time as last_time||from t_null full join t_you on t_null.deviceid=t_you.deviceid|||""".stripMargin)spark.close()}}

1.3 在集群上运行

  • 将程序打包发到集群上
  • 在集群上运行
/bigdata/spark-2.3.3-bin-hadoop2.7/bin/spark-submit --master yarn --deploy-mode client --executor-memory 1g --executor-cores 2 --num-executors 3 --class cn.dwetl.DeviceAccountRelationScore /root/spark10-1.0-SNAPSHOT.jar hdfs://Linux01:8020/wc hdfs://Linux01:8020/out01

 

更多推荐

BigData项目

本文发布于:2024-02-26 23:38:34,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1704403.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:项目   BigData

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!