大数据【企业级360°全方位用户画像】标签开发代码抽取

编程入门 行业动态 更新时间:2024-10-19 00:21:48

大数据【<a href=https://www.elefans.com/category/jswz/34/1768228.html style=企业级360°全方位用户画像】标签开发代码抽取"/>

大数据【企业级360°全方位用户画像】标签开发代码抽取

写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:/ , 博客主页:/
尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己

        在之前的几篇关于标签开发的博客中,博主已经不止一次地为大家介绍了开发代码书写的流程。无论是匹配型标签还是统计型标签,都涉及到了大量的代码重用问题。为了解决这个问题,本篇博客,我们将开始将对代码进行抽取,简便我们的开发!


1、创建一个特质

        对于scala基础语法不太熟悉的朋友们可能有疑惑了。什么是特质呢?

        其实关于scala中特质的介绍,博主在前几个月写scala专栏的时候就科普过了。感兴趣的朋友可以👉《scala快速入门系列【特质】》

        简单来说就是,scala中没有Java中的接口(interface),替代的概念是——特质。

        特质是scala中代码复用的基础单元,特质的定义和抽象类的定义很像,但它是使用trait关键字。

        我们先在IDEA中创建一个特质

        然后咱们就可以开始写代码。

        因为在前面的几篇具体讲解标签开发的博客中,博主已经将流程讲了好几遍,算得上是非常透彻了。所以本篇博客,博主在这直接贴上代码,并不做过多的过程说明。每一步具体的含义,都已经体现在了代码中,如果各位朋友们看了有任何的疑惑,可以私信我,也可以在评论区留言。

package com.czxy.baseimport java.util.Properties
import com.czxy.bean.HBaseMeta
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/** @Author: Alice菌* @Date: 2020/6/13 08:49* @Description: 此代码用户编写用户画像项目可以重用的代码*/
trait BaseModel {// 所有重复的代码(功能)都抽取到这里// 设置任务的名称def setAppName:String// 设置四级标签iddef setFourTagId:String/* 1. 初始化SparkSession对象  */private val spark:SparkSession = SparkSession.builder().appName(setAppName).master("local[*]").getOrCreate()//导入隐式转换import org.apache.spark.sql.functions._import spark.implicits._/* 2. 连接MySQL  */// 读取application.conf 内的配置private val config: Config = ConfigFactory.load()// 获取urlprivate val url : String = config.getString("jdbc.mysql.url")// 获取tableNameprivate val tableName : String = config.getString("jdbc.mysql.tablename")def getMySQLDF = {// 连接MySQL数据库spark.read.jdbc(url,tableName,new Properties)}/* 3. 读取MySQL数据库的四级标签  */def getFourTag (mysqlCoon: DataFrame): HBaseMeta ={//读取HBase中的四级标签val fourTagsDS: Dataset[Row] = mysqlCoon.select("id","rule").where("id="+setFourTagId)//切分ruleval KVMap: Map[String, String] = fourTagsDS.map(row => {// 获取到rule值val RuleValue: String = row.getAs("rule").toString// 使用“##”对数据进行切分val KVMaps: Array[(String, String)] = RuleValue.split("##").map(kv => {val arr: Array[String] = kv.split("=")(arr(0), arr(1))})KVMaps}).collectAsList().get(0).toMap       //封装成map//   将Map 转换成HbaseMeta样例类val hbaseMeta: HBaseMeta = toHBaseMeta(KVMap)hbaseMeta}/* 4. 读取五级标签数据【单独处理】*/def getFiveTagDF(mysqlConn:DataFrame)={mysqlConn.select("id","rule").where("pid="+setFourTagId).toDF()}/* 5. 读取hbase中的数据,这里将hbase作为数据源进行读取 */def getHbase(hbaseMeta: HBaseMeta)={val hbaseDatas: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource")// hbaseMeta.zkHosts 就是 192.168.10.20  和 下面是两种不同的写法.option("zkHosts",hbaseMeta.zkHosts).option(HBaseMeta.ZKPORT, hbaseMeta.zkPort).option(HBaseMeta.HBASETABLE, hbaseMeta.hbaseTable).option(HBaseMeta.FAMILY, hbaseMeta.family).option(HBaseMeta.SELECTFIELDS, hbaseMeta.selectFields).load()println(hbaseMeta)hbaseDatas.show()hbaseDatas}/* 6. 五级数据与 HBase 数据进行打标签【单独处理】 */def getNewTag(spark: SparkSession,fiveTagDF:DataFrame,hbaseDF:DataFrame):DataFrame/*** 7.合并历史数据* 将标签写入HBase** @param newTags 新标签* @return 返回最终标签*/def joinAllTags(newTags: DataFrame): DataFrame = {//读取HBase 中的历史数据val oldTags: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource").option(HBaseMeta.ZKHOSTS, "192.168.10.20").option(HBaseMeta.ZKPORT, "2181").option(HBaseMeta.HBASETABLE, "test").option(HBaseMeta.FAMILY, "detail").option(HBaseMeta.SELECTFIELDS, "userId,tagsId").load()//使用join将新数据和旧数据的tagsId合并到一起val allTags: DataFrame = oldTags.join(newTags, oldTags("userId") === newTags("userId"))//  创建一个新的udf函数,用来拼接 tagsIdval getAllTags: UserDefinedFunction = udf((oldTagsId: String, newTagsId: String) => {if (oldTagsId == "" && newTagsId != "") {newTagsId} else if (oldTagsId != "" && newTagsId == "") {oldTagsId} else if (oldTagsId == "" && newTagsId == "") {""} else {val str: String = oldTagsId + "," + newTagsIdstr.split(",").distinct.mkString(",")}})//获取最终结果allTags.select(when(oldTags("userId").isNotNull, oldTags("userId")).when(newTags("userId").isNotNull, newTags("userId")).as("userId"),getAllTags(oldTags("tagsId"), newTags("tagsId")).as("tagsId"))}/*** 8. 新建一个方法,用于保存数据  save* @param allTags   最终的结果*/def save(allTags: DataFrame): Unit = {//把最终结果保存到HBaseallTags.write.format("com.czxy.tools.HBaseDataSource").option(HBaseMeta.ZKHOSTS, "192.168.10.20").option(HBaseMeta.ZKPORT, "2181").option(HBaseMeta.HBASETABLE, "test").option(HBaseMeta.FAMILY, "detail").option(HBaseMeta.SELECTFIELDS, "userId,tagsId").save()println("结果保存完毕!!!")}/* 9. 断开连接 */def close(): Unit = {spark.close()}//将mysql中的四级标签的rule  封装成HBaseMeta//方便后续使用的时候方便调用def toHBaseMeta(KVMap: Map[String, String]): HBaseMeta = {//开始封装HBaseMeta(KVMap.getOrElse("inType",""),KVMap.getOrElse(HBaseMeta.ZKHOSTS,""),KVMap.getOrElse(HBaseMeta.ZKPORT,""),KVMap.getOrElse(HBaseMeta.HBASETABLE,""),KVMap.getOrElse(HBaseMeta.FAMILY,""),KVMap.getOrElse(HBaseMeta.SELECTFIELDS,""),KVMap.getOrElse(HBaseMeta.ROWKEY,""))}/*** 按照先后顺序, 连接mysql数据库, 读取四级,五级HBase数据* 打标签,最终写入*/def exec(): Unit = {//1.设置日志级别spark.sparkContext.setLogLevel("WARN")//2.连接mysqlval mysqlConnection: DataFrame = getMySQLDF//3. 读取mysql数据库中的四级标签val fourTags: HBaseMeta = getFourTag(mysqlConnection)//4. 读取mysql数据库中的五级标签val fiveTags: Dataset[Row] = getFiveTagDF(mysqlConnection)//读取HBase 中的数据val hBaseMea: DataFrame = getHbase(fourTags)//读取新获取的数据val newTags: DataFrame = getNewTag(spark,fiveTags, hBaseMea)newTags.show()//获取最终结果val allTags: DataFrame = joinAllTags(newTags)allTags.show()//保存到HBasesave(allTags)//断开连接close()}}

2、调用特质

        既然特质我们已经写好了,那么现在我们想要基于用户的工作进行统计型标签开发,那么我们就可以像下面的示例一样。

import com.czxy.base.BaseModel
import com.czxy.bean.{HBaseMeta, TagRule}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction/** @Author: Alice菌* @Date: 2020/6/13 09:48* @Description:基于用户的Job标签,做测试使用*/
object Test extends BaseModel{override def setAppName: String = "Job"override def setFourTagId: String = "65"// 重写Hbase数据与MySQL五级标签数据处理的方法override def getNewTag(spark: SparkSession, fiveTagDF: DataFrame, hbaseDF: DataFrame): DataFrame = {// 引入隐式转换import spark.implicits._//引入java 和scala相互转换import scala.collection.JavaConverters._//引入sparkSQL的内置函数import org.apache.spark.sql.functions._// 对5级标签的数据进行处理val fiveTageList: List[TagRule] = fiveTagDF.map(row => {// row 是一条数据// 获取出id 和 ruleval id: Int = row.getAs("id").toString.toIntval rule: String = row.getAs("rule").toString// 封装样例类TagRule(id,rule)}).collectAsList()   // 将DataSet转换成util.List[TagRule]   这个类型遍历时无法获取id,rule数据.asScala.toList    // 将util.List转换成list   需要隐式转换    import scala.collection.JavaConverters._// 需要自定义UDF函数val getUserTags: UserDefinedFunction = udf((rule: String) => {// 设置标签的默认值var tagId: Int = 0// 遍历每一个五级标签的rulefor (tagRule <- fiveTageList) {if (tagRule.rule == rule) {tagId = tagRule.id}}tagId})val jobNewTags : DataFrame = hbaseDF.select('id.as ("userId"),getUserTags('job).as("tagsId"))jobNewTags.show(5)jobNewTags}def main(args: Array[String]): Unit = {exec()}}

        我们可以发现,现在开发一个标签,我们只需要实现第一步写好的特质,然后在具体的类中设置任务的名称AppName和四级标签的id,以及重写Hbase数据与MySQL五级标签数据处理的方法。然后在程序的主入口main函数中,调用特质中的exec方法即可。

        这大大的减少了我们的工作量。不知道各位朋友感受到了没有呢?

结语

        博主在经过了几个小时的开发后,目前已经成功了开发了15个标签,分别是7个匹配型和8个统计型标签。等过段时间学习了挖掘算法后,再开发几个挖掘型的标签后,有希望完成超过20个标签😎

        如果以上过程中出现了任何的纰漏错误,烦请大佬们指正😅

        受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波🙏

        希望我们都能在学习的道路上越走越远😉

更多推荐

大数据【企业级360°全方位用户画像】标签开发代码抽取

本文发布于:2024-02-05 09:46:18,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1744750.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:企业级   画像   代码   标签   数据

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!