项目"/>
BigData项目
1. 生成GEOHASH字典
1.1 先将全国的经纬度整理成省市区经纬度的格式
CREATE TABLE area_dict
AS
SELECTsheng.AREANAME provience,shi.AREANAME city,qu.AREANAME region,qu.BD09_LNG BD09_LNG,qu.BD09_LAT BD09_LAT
FROM
t_md_areas qu
JOIN
t_md_areas shi ON qu.PARENTID = shi.ID
JOIN
t_md_areas sheng ON shi.PARENTID = sheng.ID
1.2 将表的经纬度转换成GEOHASH
import java.util.Propertiesimport ch.hsr.geohash.GeoHash
import org.apache.spark.sql.SparkSessionobject GeoHashDictGen {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("geohash字典生成").master("local[*]").getOrCreate()val properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password", "root")val frame = spark.read.jdbc("jdbc:mysql://linux01:3306/realtimedw", "area_dict", properties)frame.createTempView("df")val gps2geohash = (lat: Double, lng: Double) => {//将gps转换成geohashGeoHash.geoHashStringWithCharacterPrecision(lat, lng, 5)}spark.udf.register("gps2geohash", gps2geohash)val res = spark.sql("""||select| provience,| city,| region,| gps2geohash(BD09_LAT,BD09_LNG) as geohash|from|df||""".stripMargin)res.write.parquet("hdfs://linux01:8020/dicts/geodict")spark.close()}
}
2 设备账号关联得分计算
1.1 测试
import org.apache.spark.sql.SparkSessionobject DeviceAccountRelationScoreTest {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("设备账号关联得分计算").master("local[*]").getOrCreate()val idbind = spark.read.option("header", "true").option("nullValue","\\N") //把数据中的\N 自动换成null.csv("D:\\ideaProjects\\sp_boot1\\bigdata_project\\dw_etl\\testdata\\idbind\\2021-01-11\\idbind.csv")val log = spark.read.option("header", "true").csv("D:\\ideaProjects\\sp_boot1\\bigdata_project\\dw_etl\\testdata\\logdata\\2021-01-12\\log.csv")idbind.createTempView("rs")log.createTempView("t_log")//将两个数据join到一块val joined = spark.sql("""||select|| t1.deviceid rs_deviceid,| if(trim(t1.account)="",null,t1.account) rs_account,| cast(t1.score as double) rs_score,| t1.first_time first_time,| t1.last_time last_time,| t2.deviceid log_deviceid,| if(trim(t2.account)="''",null,t2.account) log_account,| cast(t2.score as double) log_score,| t2.time time||from|rs t1 full join t_log t2 on t1.deviceid=t2.deviceid and t1.account = t2.account|||""".stripMargin)/*** +-----------+----------+--------+----------+---------+------------+-----------+---------+----+* |rs_deviceid|rs_account|rs_score|first_time|last_time|log_deviceid|log_account|log_score|time|* +-----------+----------+--------+----------+---------+------------+-----------+---------+----+* | d1| c1| 200.0| 1| 10| d1| c1| 200.0| 11|* | null| null| null| null| null| d1| c5| 100.0| 11|* | null| null| null| null| null| d6| c6| 200.0| 11|* | d3| null| null| null| null| null| null| null|null|* | null| null| null| null| null| d5| c5| 200.0| 11|* | d2| c2| 800.0| 2| 8| null| null| null|null|* | d2| c3| 600.0| 3| 7| d2| c3| 100.0| 11|* | d4| c4| 200.0| 1| 10| null| null| null|null|* | d5| null| null| null| null| null| null| null|null|* | null| null| null| null| null| d7| null| null|null|* +-----------+----------+--------+----------+---------+------------+-----------+---------+----+**/joined.createTempView("t_join")val res1 = spark.sql("""||select| nvl(rs_deviceid,log_deviceid) deviceid,| nvl(rs_account,log_account) account,| if(rs_deviceid is not null and log_deviceid is null,rs_score*0.7,nvl(rs_score,0)+nvl(log_score,0)) score,| nvl(first_time,time) first_time,| nvl(time,last_time) last_time|from|t_join||""".stripMargin)//去除无用信息/*** +--------+-------+-----+----------+---------+* |deviceid|account|score|first_time|last_time|* +--------+-------+-----+----------+---------+* | d1| c1|400.0| 1| 11|* | d1| c5|100.0| 11| 11|* | d6| c6|200.0| 11| 11|* | d3| null| null| null| null|* | d5| c5|200.0| 11| 11|* | d2| c2|560.0| 2| 8|* | d2| c3|700.0| 3| 11|* | d4| c4|140.0| 1| 10|* | d5| null| null| null| null|* | d7| null| 0.0| null| null|* +--------+-------+-----+----------+---------+*/res1.where("account is null").createTempView("t_null")res1.where("account is not null").createTempView("t_you")spark.sql("""||select| nvl(you.deviceid,wu.deviceid) deviceid,| you.account account,| you.score score,| you.first_time first_time,| you.last_time last_time|from|t_you you full join t_null wu on you.deviceid = wu.deviceid|||""".stripMargin).show()/*** +--------+-------+-----+----------+---------+* |deviceid|account|score|first_time|last_time|* +--------+-------+-----+----------+---------+* | d2| c2|560.0| 2| 8|* | d2| c3|700.0| 3| 11|* | d3| null| null| null| null|* | d1| c1|400.0| 1| 11|* | d1| c5|100.0| 11| 11|* | d4| c4|140.0| 1| 10|* | d6| c6|200.0| 11| 11|* | d7| null| null| null| null|* | d5| c5|200.0| 11| 11|* +--------+-------+-----+----------+---------+*/spark.close()}}
1.2 真实数据设备&账号关联得分计算
object DeviceAccountRelationScore {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.WARN)val spark = SparkSession.builder()//.master("local[*]").config("spark.sql.shuffle.partitions","10") //spark任务的并行度.appName("设备账号关联得分计算").enableHiveSupport().getOrCreate()// 加载T-1日的设备账号关联评分表val relation = spark.read.table("dwd.device_account_relation").where("dt='2021-01-09'")// 加载T日的日志表val log = spark.read.table("ods.event_app_log").where("dt='2021-01-10'")// relation.show(10,false)// log.show(10,false)/*** 先聚合计算T日的日志中的设备&账号评分*/log.createTempView("log_detail")val logAggr = spark.sql("""|select|deviceid,|if(trim(account)='',null,account) as account,|cast(count(distinct sessionid)*100 as double) as score,|min(timestamp) as time||from log_detail|group by deviceid,account|||""".stripMargin)relation.createTempView("re")logAggr.createTempView("log")val joined = spark.sql("""|||select|re.deviceid as re_deviceid,|re.account as re_account,|cast(re.score as double) as re_score,|re.first_time as re_firsttime,|re.last_time as re_lasttime,|log.deviceid as log_deviceid,|log.account as log_account,|cast(log.score as double) as log_score,|log.time as time||from re full join log|on re.deviceid=log.deviceid and re.account=log.account|||""".stripMargin)joined.createTempView("joined")val res = spark.sql("""||select| nvl(re_deviceid,log_deviceid) as deviceid,| nvl(re_account,log_account) as account,| if(re_deviceid is not null and log_deviceid is null,re_score*0.7,nvl(re_score,0)+log_score) score,| nvl(re_firsttime,time) first_time,| nvl(time,re_lasttime) as last_time||from joined||""".stripMargin)/*** 从这个结果中,去掉那些已经有了关联账号的空设备*/res.where("account is null").createTempView("t_null")res.where("account is not null").createTempView("t_you")spark.sql("""||insert into table dwd.device_account_relation partition(dt='2021-01-10')||select| nvl(t_you.deviceid,t_null.deviceid) as deviceid,| t_you.account as account,| t_you.score as score,| t_you.first_time as first_time,| t_you.last_time as last_time||from t_null full join t_you on t_null.deviceid=t_you.deviceid|||""".stripMargin)spark.close()}}
1.3 在集群上运行
- 将程序打包发到集群上
- 在集群上运行
/bigdata/spark-2.3.3-bin-hadoop2.7/bin/spark-submit --master yarn --deploy-mode client --executor-memory 1g --executor-cores 2 --num-executors 3 --class cn.dwetl.DeviceAccountRelationScore /root/spark10-1.0-SNAPSHOT.jar hdfs://Linux01:8020/wc hdfs://Linux01:8020/out01
更多推荐
BigData项目
发布评论