Spark综合使用及用户行为案例访问session统计分析实战

编程入门 行业动态 更新时间:2024-10-06 20:25:42

Spark综合使用及用户行为案例访问session<a href=https://www.elefans.com/category/jswz/34/1763861.html style=统计分析实战"/>

Spark综合使用及用户行为案例访问session统计分析实战

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq,如有任何技术交流,可随时联系。

1 Scala 操作符

2 Spark RDD 持久化

cache()和 persist()的区别在于, cache()是 persist()的一种简化方式, cache()的底 层就是调用的 persist()的无参版本,同时就是调用 persist(MEMORY_ONLY),将输 入持久化到内存中。如果需要从内存中清除缓存,那么可以使用 unpersist()方法。

3 Spark 广播变量

广播变量允许程序员在每个机器上保留缓存的只读变量,而不是给每个任务发 送一个副本。 例如,可以使用它们以有效的方式为每个节点提供一个大型输入数据 集的副本。 Spark 还尝试使用高效的广播算法分发广播变量,以降低通信成本。

Spark 提供的 Broadcast Variable 是只读的,并且在每个节点上只会有一个副本, 而不会为每个 task 都拷贝一份副本,因此, 它的最大作用,就是减少变量到各个节 点的网络传输消耗,以及在各个节点上的内存消耗。此外, Spark 内部也使用了高效 的广播算法来减少网络消耗。

4 Spark 累加器

累加器(accumulator): Accumulator 是仅仅被相关操作累加的变量,因此可以 在并行中被有效地支持。它们可用于实现计数器(如 MapReduce)或总和计数。 Accumulator 是存在于 Driver 端的,从节点不断把值发到 Driver 端,在 Driver 端计数(Spark UI 在 SparkContext 创建时被创建,即在 Driver 端被创建,因此它可 以读取 Accumulator 的数值), 累加器是存在于 Driver 端的一个值,从节点是读取不到的。

Spark 提供的 Accumulator 主要用于多个节点对一个变量进行共享性的操作。 Accumulator 只提供了累加的功能,但是却给我们提供了多个 task 对于同一个变量 并行操作的功能,但是 task 只能对 Accumulator 进行累加操作,不能读取它的值, 只有 Driver 程序可以读取 Accumulator 的值。

5 Spark将DataFrame插入到Hive表中

  • DataFrame保存到Hive表中

          // 1:ArrayBuffer[ProductInfo]生成private def mockProductInfo(): Array[ProductInfo] = {val rows = ArrayBuffer[ProductInfo]()val random = new Random()val productStatus = Array(0, 1)for (i <- 0 to 100) {val productId = ival productName = "product" + ival extendInfo = "{\"product_status\": " + productStatus(random.nextInt(2)) + "}"rows += ProductInfo(productId, productName, extendInfo)}rows.toArray}// 2:模拟数据val userVisitActionData = this.mockUserVisitActionData()val userInfoData = this.mockUserInfo()val productInfoData = this.mockProductInfo()// 3:将模拟数据装换为RDDval userVisitActionRdd = spark.sparkContext.makeRDD(userVisitActionData)val userInfoRdd = spark.sparkContext.makeRDD(userInfoData)val productInfoRdd = spark.sparkContext.makeRDD(productInfoData)// 4:加载SparkSQL的隐式转换支持import spark.implicits._// 5:将用户访问数据装换为DF保存到Hive表中val userVisitActionDF = userVisitActionRdd.toDF()insertHive(spark, USER_VISIT_ACTION_TABLE, userVisitActionDF)// 6:将用户信息数据转换为DF保存到Hive表中val userInfoDF = userInfoRdd.toDF()insertHive(spark, USER_INFO_TABLE, userInfoDF)// 7:将产品信息数据转换为DF保存到Hive表中val productInfoDF = productInfoRdd.toDF()insertHive(spark, PRODUCT_INFO_TABLE, productInfoDF)// 8:将DataFrame插入到Hive表中private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {spark.sql("DROP TABLE IF EXISTS " + tableName)dataDF.write.saveAsTable(tableName)}
    复制代码
  • DataSet 与 RDD 互操作

      1.通过编程获取 Schema:通过 spark 内部的 StructType 方式,将普通的 RDD 转换成 DataFrame。 object SparkRDDtoDF {def rddToDF(sparkSession:SparkSession):DataFrame = {//设置 schema 结构val schema = StructType(Seq(StructField("name",StringType,true),StructField("age",IntegerType,true)))val rowRDD = sparkSession.sparkContext.textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2).map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))sparkSession.createDataFrame(rowRDD,schema)}2.通过反射获取 Schema:使用 case class 的方式,不过在 scala 2.10 中最大支持 22 个字段的 case class,这点需要注意;case class Person(name:String,age:Int)def rddToDFCase(sparkSession : SparkSession):DataFrame = {//导入隐饰操作,否则 RDD 无法调用 toDF 方法import sparkSession.implicits._val peopleRDD = sparkSession.sparkContext.textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2).map( x => x.split(",")).map( x => Person(x(0),x(1).trim().toInt)).toDF()peopleRDD}3 Main函数def main(agrs : Array[String]):Unit = {val conf = new SparkConf().setMaster("local[2]")conf.set("spark.sql.warehouse.dir","file:/E:/scala_workspace/z_spark_study/")conf.set("spark.sql.shuffle.partitions","20")val sparkSession = SparkSession.builder().appName("RDD to DataFrame").config(conf).getOrCreate()// 通过代码的方式,设置 Spark log4j 的级别sparkSession.sparkContext.setLogLevel("WARN")import sparkSession.implicits._//使用 case class 的方式//val peopleDF = rddToDFCase(sparkSession)// 通过编程的方式完成 RDD 向val peopleDF = rddToDF(sparkSession)peopleDF.show()peopleDF.select($"name",$"age").filter($"age">20).show()}}
    复制代码
  • 4 DataFrame/DataSet 转 RDD

    val rdd1=testDF.rdd
    val rdd2=testDS.rdd
    复制代码
  • 5 RDD 转 DataFrame

      import spark.implicits._val testDF = rdd.map {line=>(line._1,line._2)}.toDF("col1","col2")
    复制代码
  • 6 DataSet 转 DataFrame

      import spark.implicits._val testDF = testDS.toDF
    复制代码
  • 7 DataFrame 转 DataSet

      import spark.implicits._//定义字段名和类型case class Coltest(col1:String, col2:Int) extends Serializableval testDS = testDF.as[Coltest]
    复制代码

6 用户自定义聚合函数(UDAF)

    1. 弱类型 UDAF 函数

         /*** 用户自定义聚合函数*/class GroupConcatDistinctUDAF extends UserDefinedAggregateFunction {/*** 聚合函数输入参数的数据类型*/override def inputSchema: StructType = StructType(StructField("cityInfo", StringType) ::Nil)/*** 聚合缓冲区中值的类型* 中间进行聚合时所处理的数据类型*/override def bufferSchema: StructType = StructType(StructField("bufferCityInfo",StringType) :: Nil)/*** 函数返回值的数据类型*/override def dataType: DataType = StringType/*** 一致性检验,如果为 true,那么输入不变的情况下计算的结果也是不变的*/override def deterministic: Boolean = true/*** 设置聚合中间 buffer 的初始值* 需要保证这个语义:两个初始 buffer 调用下面实现的 merge 方法后也应该为初始 buffer 即如果你初始值是1,然后你 merge 是执行一个相加的动作,两个初始 buffer 合并之后等于 2,不会等于初始 buffer 了。这样的初始值就是有问题的,所以初始值也叫"zero value"*/override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)= ""}/*** 用输入数据 input 更新 buffer 值,类似于 combineByKey*/override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {// 缓冲中的已经拼接过的城市信息串var bufferCityInfo = buffer.getString(0)// 刚刚传递进来的某个城市信息val cityInfo = input.getString(0)// 在这里要实现去重的逻辑// 判断:之前没有拼接过某个城市信息,那么这里才可以接下去拼接新的城市信息if(!bufferCityInfo.contains(cityInfo)) {if("".equals(bufferCityInfo))bufferCityInfo += cityInfoelse {// 比如 1:北京// 1:北京,2:上海bufferCityInfo += "," + cityInfo}buffer.update(0, bufferCityInfo)}}/*** 合并两个 buffer,将 buffer2 合并到 buffer1.在合并两个分区聚合结果的时候会被用到,类似于reduceByKey* 这里要注意该方法没有返回值,在实现的时候是把 buffer2 合并到 buffer1 中去,你需要实现这个合并细节*/override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {var bufferCityInfo1 = buffer1.getString(0);val bufferCityInfo2 = buffer2.getString(0);for(cityInfo <- bufferCityInfo2.split(",")) {if(!bufferCityInfo1.contains(cityInfo)) {if("".equals(bufferCityInfo1)) {bufferCityInfo1 += cityInfo;} else {bufferCityInfo1 += "," + cityInfo;}}}buffer1.update(0, bufferCityInfo1);}/*** 计算并返回最终的聚合结果*/override def evaluate(buffer: Row): Any = {buffer.getString(0)}}
      复制代码
    1. 强类型 UDAF 函数

         // 定义 case 类case class Employee(name: String, salary: Long)case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] {/*** 计算并返回最终的聚合结果*/def zero: Average = Average(0L, 0L)/*** 根据传入的参数值更新 buffer 值*/def reduce(buffer: Average, employee: Employee): Average = {buffer.sum += employee.salarybuffer.count += 1buffer}/*** 合并两个 buffer 值,将 buffer2 的值合并到 buffer1*/def merge(b1: Average, b2: Average): Average = {b1.sum += b2.sumb1.count += b2.countb1}/*** 计算输出*/def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count/*** 设定中间值类型的编码器,要转换成 case 类* Encoders.product 是进行 scala 元组和 case 类转换的编码器*/def bufferEncoder: Encoder[Average] = Encoders.product/*** 设定最终输出值的编码器*/def outputEncoder: Encoder[Double] = Encoders.scalaDouble}
      复制代码

7 开窗函数

  • 开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合), 它 对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中 同时返回基础行的列和聚合列。

  • 开窗函数的调用格式为: 函数名(列) OVER(选项)

      第一大类: 聚合开窗函数 -> 聚合函数(列) OVER (选项),这里的选项可以是PARTITION BY 子句,但不可是 ORDER BY 子句。def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("score").setMaster("local[*]")val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import sparkSession.implicits._val scoreDF = sparkSession.sparkContext.makeRDD(Array(Score("a1", 1, 80),Score("a2", 1, 78),Score("a3", 1, 95),Score("a4", 2, 74),Score("a5", 2, 92),Score("a6", 3, 99),Score("a7", 3, 99),Score("a8", 3, 45),Score("a9", 3, 55),Score("a10", 3, 78))).toDF("name", "class", "score")scoreDF.createOrReplaceTempView("score")scoreDF.show()}OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数sparkSession.sql("select name, class, score, count(name) over() name_count from score")PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。sparkSession.sql("select name, class, score, count(name) over(partition by class) name_count from score").show()|name|class|score|name_count|+----+-----+-----+----------+| a1| 1| 80| 3|| a2| 1| 78| 3|| a3| 1| 95| 3|| a6| 3| 99| 5|| a7| 3| 99| 5|| a8| 3| 45| 5|| a9| 3| 55| 5|| a10| 3| 78| 5|| a4| 2| 74| 2|| a5| 2| 92| 2|+----+-----+-----+----------+第二大类: 排序开窗函数 -> 排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。对于排序开窗函数来讲,它支持的开窗函数分别为: ROW_NUMBER(行号)、RANK(排名)、 DENSE_RANK(密集排名)和 NTILE(分组排名)。sparkSession.sql("select name, class, score, row_number() over(order by score) rank fromscore").show()+----+-----+-----+----+|name|class|score|rank|+----+-----+-----+----+| a8| 3| 45| 1|| a9| 3| 55| 2|| a4| 2| 74| 3|| a2| 1| 78| 4|| a10| 3| 78| 5|| a1| 1| 80| 6|| a5| 2| 92| 7|| a3| 1| 95| 8|| a6| 3| 99| 9|| a7| 3| 99| 10|+----+-----+-----+----+sparkSession.sql("select name, class, score, rank() over(order by score) rank fromscore").show()+----+-----+-----+----+|name|class|score|rank|+----+-----+-----+----+| a8| 3| 45| 1|| a9| 3| 55| 2|| a4| 2| 74| 3|| a2| 1| 78| 4|| a10| 3| 78| 4|| a1| 1| 80| 6|| a5| 2| 92| 7|| a3| 1| 95| 8|| a6| 3| 99| 9|| a7| 3| 99| 9|+----+-----+-----+----+sparkSession.sql("select name, class, score, dense_rank() over(order by score) rank fromscore").show()----+-----+-----+----+|name|class|score|rank|+----+-----+-----+----+| a8| 3| 45| 1|| a9| 3| 55| 2|| a4| 2| 74| 3|| a2| 1| 78| 4|| a10| 3| 78| 4|| a1| 1| 80| 5|| a5| 2| 92| 6|| a3| 1| 95| 7|| a6| 3| 99| 8|| a7| 3| 99| 8|+----+-----+-----+----+sparkSession.sql("select name, class, score, ntile(6) over(order by score) rank fromscore").show()+----+-----+-----+----+|name|class|score|rank|+----+-----+-----+----+| a8| 3| 45| 1|| a9| 3| 55| 1|| a4| 2| 74| 2|| a2| 1| 78| 2|| a10| 3| 78| 3|| a1| 1| 80| 3|| a5| 2| 92| 4|| a3| 1| 95| 4|| a6| 3| 99| 5|| a7| 3| 99| 6|+----+-----+-----+----+
    复制代码

8 Dstream updataStateByKey 算子(要求必须开启 Checkpoint 机制)

    object updateStateByKeyWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("Wordcount")val ssc = new StreamingContext(conf, Seconds(1))ssc.checkpoint("hdfs://s100:8020/wordcount_checkpoint")val lines = ssc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val pairs = words.map(word => (word, 1))val wordCount = pairs.updateStateByKey((values:Seq[Int], state:Option[Int]) =>{var newValue = state.getOrElse(0)for(value <- values){newValue += value}Option(newValue)})wordCount.print()ssc.start()ssc.awaitTermination()}}
复制代码

9 电商综合应用案例

9.1 原数据模型
  • 用户行为表模型(每一次Action点击都会生成多条记录,1个Session对应多个页面Id)

  • 用户表

  • 物品表

    1. 点击Session2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,3,2018-02-11 17:04:42,null,37,17,null,null,null,null,72. 搜索Session2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,3,2018-02-11 17:29:50,重庆小面,-1,-1,null,null,null,null,13. 下单Session2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,6,2018-02-11 17:50:10,null,-1,-1,61,71,null,null,24. 付款Session2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,4,2018-02-11 17:18:24,null,-1,-1,null,null,83,17,1
复制代码

9.2 数据处理模型
  • 用户访问行为模型(每一个 Session_Id对应一个用户,从而可以聚合一个用户的所有操作行为)

  • 一个 Session_Id 对应多个action_time,从而可以得出每一个Session的访问周期Visit_Length。

  • 一个 Session_Id 对应多个page_id,可以进一步统计出Step_Length 以及转化率等指标。

      Session_Id | Search_Keywords | Click_Category_Id | Visit_Length | Step_Length | Start_Time
    复制代码
  • 初步统计出每一个 Session_Id对应的Visit_Length和Step_Length

  • 联合用户信息进行定制过滤后,通过累加器,统计出visit_length_ratio及step_length_ratio
9.3 累加器功能实现
  • 累加器在Driver端维护了一个Map,用于集中存储所有Sesson中(如:1s_3s或1_3_ratio等)的访问步长和访问时长占比累积数。

  • 每一个Sesson 包含了一种(如:1s_3s或1_3_ratio)特征。

     import org.apache.spark.util.AccumulatorV2import scala.collection.mutable/*** 自定义累加器*/class SessionAggrStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {// 保存所有聚合数据private val aggrStatMap = mutable.HashMap[String, Int]()override def isZero: Boolean = {aggrStatMap.isEmpty}override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {val newAcc = new SessionAggrStatAccumulatoraggrStatMap.synchronized{newAcc.aggrStatMap ++= this.aggrStatMap}newAcc}override def reset(): Unit = {aggrStatMap.clear()}mutable.HashMap[String, Int]()的更新操作override def add(v: String): Unit = {if (!aggrStatMap.contains(v))aggrStatMap += (v -> 0)aggrStatMap.update(v, aggrStatMap(v) + 1)}override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {other match {case acc:SessionAggrStatAccumulator => {(this.aggrStatMap /: acc.value){ case (map, (k,v)) => map += ( k -> (v + map.getOrElse(k, 0)) )}}}}override def value: mutable.HashMap[String, Int] = {this.aggrStatMap}}
    复制代码
9.4 Session分析模块
  • 获取统计任务参数【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】

    task.params.json={startDate:"2018-02-01", \endDate:"2018-02-28", \startAge: 20, \endAge: 50, \professionals: "",  \cities: "", \sex:"", \keywords:"", \categoryIds:"", \targetPageFlow:"1,2,3,4,5,6,7"}val taskParam = JSONObject.fromObject(jsonStr)
    复制代码
  • 创建Spark客户端

     // 构建Spark上下文val sparkConf = new SparkConf().setAppName("SessionAnalyzer").setMaster("local[*]")// 创建Spark客户端val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()val sc = spark.sparkContext
    复制代码
  • 设置自定义累加器,实现所有数据的统计功能,注意累加器也是懒执行的

     val sessionAggrStatAccumulator = new SessionAggrStatAccumulator
    复制代码
  • 注册自定义累加器

      sc.register(sessionAggrStatAccumulator, "sessionAggrStatAccumulator")
    复制代码
  • 首先要从user_visit_action的Hive表中,查询出来指定日期范围内的行为数据

      def getParam(jsonObject:JSONObject, field:String):String = {jsonObject.getString(field)}def getActionRDDByDateRange(spark: SparkSession, taskParam: JSONObject): RDD[UserVisitAction] = {val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)import spark.implicits._spark.sql("select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'").as[UserVisitAction].rdd}rdd仍然具有表头信息val actionRDD = this.getActionRDDByDateRange(spark, taskParam)将用户行为信息转换为 K-V 结构val sessionid2actionRDD = actionRDD.map(item => (item.session_id, item))
    复制代码
  • 将数据进行内存缓存

      sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY)
    复制代码
  • 将数据转换为Session粒度(对数据聚合变换,得到过滤,搜索列表数组,点击类别数组,访问起始时间及访问步长,访问时长等)

      格式为<sessionid,(sessionid,searchKeywords,clickCategoryIds,age,professional,city,sex)>def aggregateBySession(spark: SparkSession, sessinoid2actionRDD: RDD[(String, UserVisitAction)]): RDD[(String, String)] = {// 对行为数据按session粒度进行分组val sessionid2ActionsRDD = sessinoid2actionRDD.groupByKey()// 对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来,<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>val userid2PartAggrInfoRDD = sessionid2ActionsRDD.map { case (sessionid, userVisitActions) =>val searchKeywordsBuffer = new StringBuffer("")val clickCategoryIdsBuffer = new StringBuffer("")var userid = -1L// session的起始和结束时间var startTime: Date = nullvar endTime: Date = null// session的访问步长var stepLength = 0// 遍历session所有的访问行为userVisitActions.foreach { userVisitAction =>if (userid == -1L) {userid = userVisitAction.user_id}val searchKeyword = userVisitAction.search_keywordval clickCategoryId = userVisitAction.click_category_id// 实际上这里要对数据说明一下// 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的// 其实,只有搜索行为,是有searchKeyword字段的// 只有点击品类的行为,是有clickCategoryId字段的// 所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的// 我们决定是否将搜索词或点击品类id拼接到字符串中去// 首先要满足:不能是null值// 其次,之前的字符串中还没有搜索词或者点击品类idif (StringUtils.isNotEmpty(searchKeyword)) {if (!searchKeywordsBuffer.toString.contains(searchKeyword)) {searchKeywordsBuffer.append(searchKeyword + ",")}}if (clickCategoryId != null && clickCategoryId != -1L) {if (!clickCategoryIdsBuffer.toString.contains(clickCategoryId.toString)) {clickCategoryIdsBuffer.append(clickCategoryId + ",")}}// 计算session开始和结束时间val actionTime = DateUtils.parseTime(userVisitAction.action_time)if (startTime == null) {startTime = actionTime}if (endTime == null) {endTime = actionTime}if (actionTime.before(startTime)) {startTime = actionTime}if (actionTime.after(endTime)) {endTime = actionTime}// 计算session访问步长stepLength += 1}val searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString)val clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString)// 计算session访问时长(秒)val visitLength = (endTime.getTime() - startTime.getTime()) / 1000// 聚合数据,使用key=value|key=valueval partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|" +Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|" +Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|" +Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime)(userid, partAggrInfo);}// 查询所有用户数据,并映射成<userid,Row>的格式import spark.implicits._val userid2InfoRDD = spark.sql("select * from user_info").as[UserInfo].rdd.map(item => (item.user_id, item))// 将session粒度聚合数据,与用户信息进行joinval userid2FullInfoRDD = userid2PartAggrInfoRDD.join(userid2InfoRDD);// 对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据val sessionid2FullAggrInfoRDD = userid2FullInfoRDD.map { case (uid, (partAggrInfo, userInfo)) =>val sessionid = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID)val fullAggrInfo = partAggrInfo + "|" +Constants.FIELD_AGE + "=" + userInfo.age + "|" +Constants.FIELD_PROFESSIONAL + "=" + userInfo.professional + "|" +Constants.FIELD_CITY + "=" + userInfo.city + "|" +Constants.FIELD_SEX + "=" + userInfo.sex(sessionid, fullAggrInfo)}sessionid2FullAggrInfoRDD}
    复制代码
  • 根据查询任务的配置,过滤用户的行为数据,同时在过滤的过程中,对累加器中的数据进行统计

    按照年龄、职业、城市范围、性别、搜索词、点击品类这些条件过滤后的最终结果def filterSessionAndAggrStat(sessionid2AggrInfoRDD: RDD[(String, String)],taskParam: JSONObject,sessionAggrStatAccumulator: AccumulatorV2[String, mutable.HashMap[String, Int]]): RDD[(String, String)] = {// 获取查询任务中的配置val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE)val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS)val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES)val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX)val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS)val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS)var _parameter = (if (startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +(if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +(if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") +(if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") +(if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") +(if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") +(if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "")if (_parameter.endsWith("\\|")) {_parameter = _parameter.substring(0, _parameter.length() - 1)}val parameter = _parameter// 根据筛选参数进行过滤val filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter { case (sessionid, aggrInfo) =>// 接着,依次按照筛选条件进行过滤// 按照年龄范围进行过滤(startAge、endAge)var success = trueif (!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE))success = false// 按照职业范围进行过滤(professionals)// 互联网,IT,软件// 互联网if (!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, parameter, Constants.PARAM_PROFESSIONALS))success = false// 按照城市范围进行过滤(cities)// 北京,上海,广州,深圳// 成都if (!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, parameter, Constants.PARAM_CITIES))success = false// 按照性别进行过滤// 男/女// 男,女if (!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, parameter, Constants.PARAM_SEX))success = false// 按照搜索词进行过滤// 我们的session可能搜索了 火锅,蛋糕,烧烤// 我们的筛选条件可能是 火锅,串串香,iphone手机// 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中// 任何一个搜索词相当,即通过if (!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, parameter, Constants.PARAM_KEYWORDS))success = false// 按照点击品类id进行过滤if (!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, parameter, Constants.PARAM_CATEGORY_IDS))success = false// 如果符合任务搜索需求if (success) {sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);// 计算访问时长范围def calculateVisitLength(visitLength: Long) {if (visitLength >= 1 && visitLength <= 3) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);} else if (visitLength >= 4 && visitLength <= 6) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);} else if (visitLength >= 7 && visitLength <= 9) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);} else if (visitLength >= 10 && visitLength <= 30) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);} else if (visitLength > 30 && visitLength <= 60) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);} else if (visitLength > 60 && visitLength <= 180) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);} else if (visitLength > 180 && visitLength <= 600) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);} else if (visitLength > 600 && visitLength <= 1800) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);} else if (visitLength > 1800) {sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);}}// 计算访问步长范围def calculateStepLength(stepLength: Long) {if (stepLength >= 1 && stepLength <= 3) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);} else if (stepLength >= 4 && stepLength <= 6) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);} else if (stepLength >= 7 && stepLength <= 9) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);} else if (stepLength >= 10 && stepLength <= 30) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);} else if (stepLength > 30 && stepLength <= 60) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);} else if (stepLength > 60) {sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);}}// 计算出session的访问时长和访问步长的范围,并进行相应的累加val visitLength = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLongval stepLength = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLongcalculateVisitLength(visitLength)calculateStepLength(stepLength)}success}filteredSessionid2AggrInfoRDD
    }
    复制代码
  • 持久化辛苦聚合过滤统计值,对数据进行内存缓存

      filteredSessionid2AggrInfoRDD.persist(StorageLevel.MEMORY_ONLY)
    复制代码
  • 得到筛选的session对应的访问明细数据(得到过滤后的原始数据)

      def getSessionid2detailRDD(sessionid2aggrInfoRDD: RDD[(String, String)], sessionid2actionRDD: RDD[(String, UserVisitAction)]): RDD[(String, UserVisitAction)] = {sessionid2aggrInfoRDD.join(sessionid2actionRDD).map(item => (item._1, item._2._2))}sessionid2detailRDD.persist(StorageLevel.MEMORY_ONLY)
    复制代码
  • 利用累积器开发业务功能一:统计各个范围的session占比,并写入MySQL

          calculateAndPersistAggrStat(spark, sessionAggrStatAccumulator.value, taskUUID)def calculateAndPersistAggrStat(spark: SparkSession, value: mutable.HashMap[String, Int], taskUUID: String) {// 从Accumulator统计串中获取值val session_count = value(Constants.SESSION_COUNT).toDoubleval visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s, 0)val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0)val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0)val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0)val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0)val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0)val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0)val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0)val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0)val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0)val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0)val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0)val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0)val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0)val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0)// 计算各个访问时长和访问步长的范围val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2)val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2)val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2)val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2)val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2)val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2)val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2)val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2)val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2)val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2)val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2)val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2)val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2)val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2)val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2)// 将统计结果封装为Domain对象val sessionAggrStat = SessionAggrStat(taskUUID,session_count.toInt, visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio,visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio,visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio,step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio,step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio)import spark.implicits._val sessionAggrStatRDD = spark.sparkContext.makeRDD(Array(sessionAggrStat))sessionAggrStatRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("dbtable", "session_aggr_stat").option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).mode(SaveMode.Append).save()}
    复制代码
  • 按照Session粒度(注意每一个session可能有多条action记录。)随机均匀获取Session。

      randomExtractSession(spark, taskUUID, filteredSessionid2AggrInfoRDD, sessionid2detailRDD)def randomExtractSession(spark: SparkSession, taskUUID: String, sessionid2AggrInfoRDD: RDD[(String, String)], sessionid2actionRDD: RDD[(String, UserVisitAction)]) {// 第一步,计算出每天每小时的session数量,获取<yyyy-MM-dd_HH,aggrInfo>格式的RDDval time2sessionidRDD = sessionid2AggrInfoRDD.map { case (sessionid, aggrInfo) =>val startTime = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_START_TIME)// 将key改为yyyy-MM-dd_HH的形式(小时粒度)val dateHour = DateUtils.getDateHour(startTime)(dateHour, aggrInfo)}// 得到每天每小时的session数量// countByKey()计算每个不同的key有多少个数据// countMap<yyyy-MM-dd_HH, count>val countMap = time2sessionidRDD.countByKey()// 第二步,使用按时间比例随机抽取算法,计算出每天每小时要抽取session的索引,将<yyyy-MM-dd_HH,count>格式的map,转换成<yyyy-MM-dd,<HH,count>>的格式// dateHourCountMap <yyyy-MM-dd,<HH,count>>val dateHourCountMap = mutable.HashMap[String, mutable.HashMap[String, Long]]()for ((dateHour, count) <- countMap) {val date = dateHour.split("_")(0)val hour = dateHour.split("_")(1)// 通过模式匹配实现了if的功能dateHourCountMap.get(date) match {// 对应日期的数据不存在,则新增case None => dateHourCountMap(date) = new mutable.HashMap[String, Long](); dateHourCountMap(date) += (hour -> count)// 对应日期的数据存在,则更新// 如果有值,Some(hourCountMap)将值取到了hourCountMap中case Some(hourCountMap) => hourCountMap += (hour -> count)}}// 按时间比例随机抽取算法,总共要抽取100个session,先按照天数,进行平分// 获取每一天要抽取的数量val extractNumberPerDay = 100 / dateHourCountMap.size// dateHourExtractMap[天,[小时,index列表]]val dateHourExtractMap = mutable.HashMap[String, mutable.HashMap[String, mutable.ListBuffer[Int]]]()val random = new Random()/*** 根据每个小时应该抽取的数量,来产生随机值* 遍历每个小时,填充Map<date,<hour,(3,5,20,102)>>* @param hourExtractMap 主要用来存放生成的随机值* @param hourCountMap   每个小时的session总数* @param sessionCount   当天所有的seesion总数*/def hourExtractMapFunc(hourExtractMap: mutable.HashMap[String, mutable.ListBuffer[Int]], hourCountMap: mutable.HashMap[String, Long], sessionCount: Long) {for ((hour, count) <- hourCountMap) {// 计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量// 就可以计算出,当前小时需要抽取的session数量var hourExtractNumber = ((count / sessionCount.toDouble) * extractNumberPerDay).toIntif (hourExtractNumber > count) {hourExtractNumber = count.toInt}// 仍然通过模式匹配实现有则追加,无则新建hourExtractMap.get(hour) match {case None => hourExtractMap(hour) = new mutable.ListBuffer[Int]();// 根据数量随机生成下标for (i <- 0 to hourExtractNumber) {var extractIndex = random.nextInt(count.toInt);// 一旦随机生成的index已经存在,重新获取,直到获取到之前没有的indexwhile (hourExtractMap(hour).contains(extractIndex)) {extractIndex = random.nextInt(count.toInt);}hourExtractMap(hour) += (extractIndex)}case Some(extractIndexList) =>for (i <- 0 to hourExtractNumber) {var extractIndex = random.nextInt(count.toInt);// 一旦随机生成的index已经存在,重新获取,直到获取到之前没有的indexwhile (hourExtractMap(hour).contains(extractIndex)) {extractIndex = random.nextInt(count.toInt);}hourExtractMap(hour) += (extractIndex)}}}}// session随机抽取功能for ((date, hourCountMap) <- dateHourCountMap) {// 计算出这一天的session总数val sessionCount = hourCountMap.values.sum// dateHourExtractMap[天,[小时,小时列表]]dateHourExtractMap.get(date) match {case None => dateHourExtractMap(date) = new mutable.HashMap[String, mutable.ListBuffer[Int]]();// 更新indexhourExtractMapFunc(dateHourExtractMap(date), hourCountMap, sessionCount)case Some(hourExtractMap) => hourExtractMapFunc(hourExtractMap, hourCountMap, sessionCount)}}/* 至此,index获取完毕 *///将Map进行广播val dateHourExtractMapBroadcast = spark.sparkContext.broadcast(dateHourExtractMap)// time2sessionidRDD <yyyy-MM-dd_HH,aggrInfo>// 执行groupByKey算子,得到<yyyy-MM-dd_HH,(session aggrInfo)>val time2sessionsRDD = time2sessionidRDD.groupByKey()// 第三步:遍历每天每小时的session,然后根据随机索引进行抽取,我们用flatMap算子,遍历所有的<dateHour,(session aggrInfo)>格式的数据val sessionRandomExtract = time2sessionsRDD.flatMap { case (dateHour, items) =>val date = dateHour.split("_")(0)val hour = dateHour.split("_")(1)// 从广播变量中提取出数据val dateHourExtractMap = dateHourExtractMapBroadcast.value// 获取指定天对应的指定小时的indexList// 当前小时需要的index集合val extractIndexList = dateHourExtractMap.get(date).get(hour)// index是在外部进行维护var index = 0val sessionRandomExtractArray = new ArrayBuffer[SessionRandomExtract]()// 开始遍历所有的aggrInfofor (sessionAggrInfo <- items) {// 如果筛选List中包含当前的index,则提取此sessionAggrInfo中的数据if (extractIndexList.contains(index)) {val sessionid = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID)val starttime = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_START_TIME)val searchKeywords = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)val clickCategoryIds = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)sessionRandomExtractArray += SessionRandomExtract(taskUUID, sessionid, starttime, searchKeywords, clickCategoryIds)}// index自增index += 1}sessionRandomExtractArray}/* 将抽取后的数据保存到MySQL */// 引入隐式转换,准备进行RDD向Dataframe的转换import spark.implicits._// 为了方便地将数据保存到MySQL数据库,将RDD数据转换为DataframesessionRandomExtract.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("dbtable", "session_random_extract").option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).mode(SaveMode.Append).save()// 提取抽取出来的数据中的sessionIdval extractSessionidsRDD = sessionRandomExtract.map(item => (item.sessionid, item.sessionid))// 第四步:获取抽取出来的session的明细数据// 根据sessionId与详细数据进行聚合val extractSessionDetailRDD = extractSessionidsRDD.join(sessionid2actionRDD)// 对extractSessionDetailRDD中的数据进行聚合,提炼有价值的明细数据val sessionDetailRDD = extractSessionDetailRDD.map { case (sid, (sessionid, userVisitAction)) =>SessionDetail(taskUUID, userVisitAction.user_id, userVisitAction.session_id,userVisitAction.page_id, userVisitAction.action_time, userVisitAction.search_keyword,userVisitAction.click_category_id, userVisitAction.click_product_id, userVisitAction.order_category_ids,userVisitAction.order_product_ids, userVisitAction.pay_category_ids, userVisitAction.pay_product_ids)}// 将明细数据保存到MySQL中sessionDetailRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("dbtable", "session_detail").option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).mode(SaveMode.Append).save()}
    复制代码
  • 获取top10热门品类

     排序case class CategorySortKey(val clickCount: Long, val orderCount: Long, val payCount: Long) extends Ordered[CategorySortKey] {override def compare(that: CategorySortKey): Int = {if (this.clickCount - that.clickCount != 0) {return (this.clickCount - that.clickCount).toInt} else if (this.orderCount - that.orderCount != 0) {return (this.orderCount - that.orderCount).toInt} else if (this.payCount - that.payCount != 0) {return (this.payCount - that.payCount).toInt}0}}获取各个品类的点击次数RDDdef getClickCategoryId2CountRDD(sessionid2detailRDD: RDD[(String, UserVisitAction)]): RDD[(Long, Long)] = {// 只将点击行为过滤出来val clickActionRDD = sessionid2detailRDD.filter { case (sessionid, userVisitAction) => userVisitAction.click_category_id != null }// 获取每种类别的点击次数// map阶段:(品类ID,1L)val clickCategoryIdRDD = clickActionRDD.map { case (sessionid, userVisitAction) => (userVisitAction.click_category_id, 1L) }// 计算各个品类的点击次数// reduce阶段:对map阶段的数据进行汇总// (品类ID1,次数) (品类ID2,次数) (品类ID3,次数) ... ... (品类ID4,次数)clickCategoryIdRDD.reduceByKey(_ + _)}连接品类RDD与数据RDDdef joinCategoryAndData(categoryidRDD: RDD[(Long, Long)], clickCategoryId2CountRDD: RDD[(Long, Long)], orderCategoryId2CountRDD: RDD[(Long, Long)], payCategoryId2CountRDD: RDD[(Long, Long)]): RDD[(Long, String)] = {// 将所有品类信息与点击次数信息结合【左连接】val clickJoinRDD = categoryidRDD.leftOuterJoin(clickCategoryId2CountRDD).map { case (categoryid, (cid, optionValue)) =>val clickCount = if (optionValue.isDefined) optionValue.get else 0Lval value = Constants.FIELD_CATEGORY_ID + "=" + categoryid + "|" + Constants.FIELD_CLICK_COUNT + "=" + clickCount(categoryid, value)}// 将所有品类信息与订单次数信息结合【左连接】val orderJoinRDD = clickJoinRDD.leftOuterJoin(orderCategoryId2CountRDD).map { case (categoryid, (ovalue, optionValue)) =>val orderCount = if (optionValue.isDefined) optionValue.get else 0Lval value = ovalue + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount(categoryid, value)}// 将所有品类信息与付款次数信息结合【左连接】val payJoinRDD = orderJoinRDD.leftOuterJoin(payCategoryId2CountRDD).map { case (categoryid, (ovalue, optionValue)) =>val payCount = if (optionValue.isDefined) optionValue.get else 0Lval value = ovalue + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount(categoryid, value)}payJoinRDD
    }def getTop10Category(spark: SparkSession, taskid: String, sessionid2detailRDD: RDD[(String, UserVisitAction)]): Array[(CategorySortKey, String)] = {// 第一步:获取每一个Sessionid 点击过、下单过、支付过的数量// 获取所有产生过点击、下单、支付中任意行为的商品类别val categoryidRDD = sessionid2detailRDD.flatMap { case (sessionid, userVisitAction) =>val list = ArrayBuffer[(Long, Long)]()// 一个session中点击的商品IDif (userVisitAction.click_category_id != null) {list += ((userVisitAction.click_category_id, userVisitAction.click_category_id))}// 一个session中下单的商品ID集合if (userVisitAction.order_category_ids != null) {for (orderCategoryId <- userVisitAction.order_category_ids.split(","))list += ((orderCategoryId.toLong, orderCategoryId.toLong))}// 一个session中支付的商品ID集合if (userVisitAction.pay_category_ids != null) {for (payCategoryId <- userVisitAction.pay_category_ids.split(","))list += ((payCategoryId.toLong, payCategoryId.toLong))}list}// 对重复的categoryid进行去重// 得到了所有被点击、下单、支付的商品的品类val distinctCategoryIdRDD = categoryidRDD.distinct// 第二步:计算各品类的点击、下单和支付的次数// 计算各个品类的点击次数val clickCategoryId2CountRDD = getClickCategoryId2CountRDD(sessionid2detailRDD)// 计算各个品类的下单次数val orderCategoryId2CountRDD = getOrderCategoryId2CountRDD(sessionid2detailRDD)// 计算各个品类的支付次数val payCategoryId2CountRDD = getPayCategoryId2CountRDD(sessionid2detailRDD)// 第三步:join各品类与它的点击、下单和支付的次数// distinctCategoryIdRDD中是所有产生过点击、下单、支付行为的商品类别// 通过distinctCategoryIdRDD与各个统计数据的LeftJoin保证数据的完整性val categoryid2countRDD = joinCategoryAndData(distinctCategoryIdRDD, clickCategoryId2CountRDD, orderCategoryId2CountRDD, payCategoryId2CountRDD);// 第四步:自定义二次排序key// 第五步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)// 创建用于二次排序的联合key —— (CategorySortKey(clickCount, orderCount, payCount), line)// 按照:点击次数 -> 下单次数 -> 支付次数 这一顺序进行二次排序val sortKey2countRDD = categoryid2countRDD.map { case (categoryid, line) =>val clickCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CLICK_COUNT).toLongval orderCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_ORDER_COUNT).toLongval payCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_PAY_COUNT).toLong(CategorySortKey(clickCount, orderCount, payCount), line)}// 降序排序val sortedCategoryCountRDD = sortKey2countRDD.sortByKey(false)// 第六步:用take(10)取出top10热门品类,并写入MySQLval top10CategoryList = sortedCategoryCountRDD.take(10)val top10Category = top10CategoryList.map { case (categorySortKey, line) =>val categoryid = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CATEGORY_ID).toLongval clickCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CLICK_COUNT).toLongval orderCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_ORDER_COUNT).toLongval payCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_PAY_COUNT).toLongTop10Category(taskid, categoryid, clickCount, orderCount, payCount)}// 将Map结构转化为RDDval top10CategoryRDD = spark.sparkContext.makeRDD(top10Category)// 写入MySQL之前,将RDD转化为Dataframeimport spark.implicits._top10CategoryRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("dbtable", "top10_category").option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).mode(SaveMode.Append).save()top10CategoryList}
    复制代码
  • 获取top10热门品类的活跃session(先join热门品类得到热门的session,再迭代计算每一种品类对应的session中点击次数排名,取前10)

          1 sessionid2detailRDD 数据结构重组和计算所有品类出现的次数累加值count(一个SessionId对应的多条action记录:sessionid-iter(userVisitAction))val sessionid2ActionsRDD = sessionid2ActionRDD.groupByKey()数据结构重组后输出(categoryid, sessionid + "," + count)2 获取到top10热门品类,被各个session点击的次数【将数据集缩小】,包含大量的重复keyval top10CategorySessionCountRDD = top10CategoryIdRDD.join(categoryid2sessionCountRDD).map { case (cid, (ccid, value)) => (cid, value) }3 整合大量重复的key,按照品类分组,获取品类下的所有(sessionid + "," + count)迭代器。val top10CategorySessionCountsRDD = top10CategorySessionCountRDD.groupByKey()4 每一种品类对应的session中点击次数进行排序,取前10val top10Sessions = clicks.toList.sortWith(_.split(",")(1) > _.split(",")(1)).take(10)
    复制代码
  • 版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq,如有任何技术交流,可随时联系。

      def getTop10Session(spark: SparkSession, taskid: String, top10CategoryList: Array[(CategorySortKey, String)], sessionid2ActionRDD: RDD[(String, UserVisitAction)]) {// 第一步:将top10热门品类的id,生成一份RDD// 获得所有需要求的category集合val top10CategoryIdRDD = spark.sparkContext.makeRDD(top10CategoryList.map { case (categorySortKey, line) =>val categoryid = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CATEGORY_ID).toLong;(categoryid, categoryid)})// 第二步:计算top10品类被各session点击的次数// sessionid2ActionRDD是符合过滤(职业、年龄等)条件的完整数据// sessionid2detailRDD ( sessionId, userAction )val sessionid2ActionsRDD = sessionid2ActionRDD.groupByKey()// 获取每个品类被每一个Session点击的次数val categoryid2sessionCountRDD = sessionid2ActionsRDD.flatMap { case (sessionid, userVisitActions) =>val categoryCountMap = new mutable.HashMap[Long, Long]()// userVisitActions中聚合了一个session的所有用户行为数据// 遍历userVisitActions是提取session中的每一个用户行为,并对每一个用户行为中的点击事件进行计数for (userVisitAction <- userVisitActions) {// 如果categoryCountMap中尚不存在此点击品类,则新增品类if (!categoryCountMap.contains(userVisitAction.click_category_id))categoryCountMap.put(userVisitAction.click_category_id, 0)// 如果categoryCountMap中已经存在此点击品类,则进行累加if (userVisitAction.click_category_id != null && userVisitAction.click_category_id != -1L) {categoryCountMap.update(userVisitAction.click_category_id, categoryCountMap(userVisitAction.click_category_id) + 1)}}// 对categoryCountMap中的数据进行格式转化for ((categoryid, count) <- categoryCountMap)yield (categoryid, sessionid + "," + count)}// 通过top10热门品类top10CategoryIdRDD与完整品类点击统计categoryid2sessionCountRDD进行join,仅获取热门品类的数据信息// 获取到to10热门品类,被各个session点击的次数【将数据集缩小】val top10CategorySessionCountRDD = top10CategoryIdRDD.join(categoryid2sessionCountRDD).map { case (cid, (ccid, value)) => (cid, value) }// 第三步:分组取TopN算法实现,获取每个品类的top10活跃用户// 先按照品类分组val top10CategorySessionCountsRDD = top10CategorySessionCountRDD.groupByKey()// 将每一个品类的所有点击排序,取前十个,并转换为对象//(categoryid, sessionId=1213,sessionId=908)val top10SessionObjectRDD = top10CategorySessionCountsRDD.flatMap { case (categoryid, clicks) =>// 先排序,然后取前10val top10Sessions = clicks.toList.sortWith(_.split(",")(1) > _.split(",")(1)).take(10)// 重新整理数据top10Sessions.map { case line =>val sessionid = line.split(",")(0)val count = line.split(",")(1).toLongTop10Session(taskid, categoryid, sessionid, count)}}// 将结果以追加方式写入到MySQL中import spark.implicits._top10SessionObjectRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("dbtable", "top10_session").option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).mode(SaveMode.Append).save()val top10SessionRDD = top10SessionObjectRDD.map(item => (item.sessionid, item.sessionid))// 第四步:获取top10活跃session的明细数据val sessionDetailRDD = top10SessionRDD.join(sessionid2ActionRDD).map { case (sid, (sessionid, userVisitAction)) =>SessionDetail(taskid, userVisitAction.user_id, userVisitAction.session_id,userVisitAction.page_id, userVisitAction.action_time, userVisitAction.search_keyword,userVisitAction.click_category_id, userVisitAction.click_product_id, userVisitAction.order_category_ids,userVisitAction.order_product_ids, userVisitAction.pay_category_ids, userVisitAction.pay_product_ids)}// 将活跃Session的明细数据,写入到MySQLsessionDetailRDD.toDF().write.format("jdbc").option("url", ConfigurationManager.config.getString(Constants.JDBC_URL)).option("dbtable", "session_detail").option("user", ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).mode(SaveMode.Append).save()}
    复制代码

10 总结

温故而知新,本文为了综合复习,进行代码总结,内容粗鄙,勿怪

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq,如有任何技术交流,可随时联系。

秦凯新 于深圳

更多推荐

Spark综合使用及用户行为案例访问session统计分析实战

本文发布于:2024-02-14 14:01:44,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1763509.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:统计分析   实战   案例   用户   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!