Spark综合使用及用户行为案例广告点击量实时统计分析实战

编程入门 行业动态 更新时间:2024-10-23 19:19:04

Spark综合使用及用户行为案例广告点击量实时<a href=https://www.elefans.com/category/jswz/34/1763856.html style=统计分析实战"/>

Spark综合使用及用户行为案例广告点击量实时统计分析实战

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq,如有任何技术交流,可随时联系。

1 广告点击数据模型

1.1 数据格式

    timestamp 	  province 	  city        userid         adid
复制代码

1.2 生成动态黑名单

    数据格式(timestamp province city userid adid)统计单用户的统计次数(date, userid,adid,clickCount)阈值统计统计黑名单
复制代码

1.3 计算广告点击流量实时统计结果

    输入数据格式(userid, timestamp province city userid adid)计算后数据格式并持久化(date,province,city,adid,clickCount)
复制代码

1.4 实时统计每天每个省份top3热门广告

    输入数据格式(yyyyMMdd_province_city_adid,clickCount)计算后数据格式并持久化(date,province, adid,clickCount)注册成表ROW_NUMBER()实现窗聚合tmp_daily_ad_click_count_by_prov
复制代码

1.5 实时统计每天每个广告在最近1小时的滑动窗口内的点击趋势(每分钟的点击量)

    输入数据格式(timestamp province city userid adid)计算后数据格式并持久化(date,hour,minute,adid,clickCount)
复制代码

2 具体技术实现

2.1 SparkStreaming 与Kafka对接

  • 构建Spark上下文

      val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")// 创建Spark客户端val spark = SparkSession.builder().config(sparkConf).getOrCreate()val sc = spark.sparkContextval ssc = new StreamingContext(sc, Seconds(5))// 获取Kafka配置val broker_list = ConfigurationManager.config.getString("kafka.broker.list")val topics = ConfigurationManager.config.getString("kafka.topics")
    复制代码
  • kafka消费者配置

     val kafkaParam = Map("bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],//用于标识这个消费者属于哪个消费团体"group.id" -> "commerce-consumer-group",//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性//可以使用这个配置,latest自动重置偏移量为最新的偏移量//earilist:提交过分区,从Offset处读取,如果没有提交过offset,从头读取//latest:提交过分区,从Offset处读取,没有从最新的数据开始读取//None:如果没有提交offset,就会报错,提交过offset,就从offset处读取"auto.offset.reset" -> "latest",//如果是true,则这个消费者的偏移量会在后台自动提交"enable.automit" -> (false: java.lang.Boolean))
    复制代码
  • 设置检查点目录

      ssc.checkpoint("./streaming_checkpoint")
    复制代码
  • LocationStrategies 分配分区策略

      // 创建DStream,返回接收到的输入数据// LocationStrategies:根据给定的主题和集群地址创建consumer// LocationStrategies.PreferConsistent:持续的在所有Executor之间匀分配分区 (均匀分配,选中的每一个Executor都会分配 partition)// LocationStrategies.PreferBrokers: 如果executor和kafka brokers 在同一台机器上,选择该executor。// LocationStrategies.PreferFixed: 如果机器不是均匀的情况下,可以指定特殊的hosts。当然如果不指定,采用 LocationStrategies.PreferConsistent模式
    复制代码
  • ConsumerStrategies 消费策略

      // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer// ConsumerStrategies.Subscribe:订阅一系列主题val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))
    复制代码
  • SparkStreaming开始消费

      var adRealTimeValueDStream = adRealTimeLogDStream.map(consumerRecordRDD => consumerRecordRDD.value())
    复制代码

2.2 SparkStreaming 开始根据黑名单进行过滤

  • 算法过程如下 (timestamp province city userid adid) -> (userid, timestamp province city userid adid)

  • 根据userId进行过滤

      用于Kafka Stream的线程非安全问题,重新分区切断血统adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)val filteredAdRealTimeLogDStream = filterByBlacklist(spark,adRealTimeValueDStream)def filterByBlacklist(spark: SparkSession, adRealTimeValueDStream:DStream[String]):DStream[(Long, String)] = {// 刚刚接受到原始的用户点击行为日志之后// 根据mysql中的动态黑名单,进行实时的黑名单过滤(黑名单用户的点击行为,直接过滤掉,不要了)// 使用transform算子(将dstream中的每个batch RDD进行处理,转换为任意的其他RDD,功能很强大)val filteredAdRealTimeLogDStream = adRealTimeValueDStream.transform{ consumerRecordRDD =>//格式 :timestamp province city userid adid//某个时间点 某个省份 某个城市 某个用户 某个广告// 首先,从mysql中查询所有黑名单用户,将其转换为一个rddval adBlacklists = AdBlacklistDAO.findAll()// (userid, timestamp province city userid adid)val blacklistRDD = spark.sparkContext.makeRDD(adBlacklists.map(item => (item.userid, true)))//格式 :timestamp province city userid adidval mappedRDD = consumerRecordRDD.map(consumerRecord => {val userid = consumerRecord.split(" ")(3).toLong(userid,consumerRecord)})// 将原始日志数据rdd,与黑名单rdd,进行左外连接// 如果说原始日志的userid,没有在对应的黑名单中,join不到,左外连接// 用inner join,内连接,会导致数据丢失val joinedRDD = mappedRDD.leftOuterJoin(blacklistRDD)val filteredRDD = joinedRDD.filter{ case (userid,(log, black)) =>// 如果这个值存在,那么说明原始日志中的userid,join到了某个黑名单用户if(black.isDefined && black.get) false else true}filteredRDD.map{ case (userid,(log, black)) => (userid, log)}}filteredAdRealTimeLogDStream
    }
    复制代码

2.3 SparkStreaming 生成动态黑名单

  • 转化为用户粒度进行过滤,抛弃 province city ,格式为:(date, userid,adid,clickCount)

  • 过滤次数大于阈值的userId,持久化到磁盘。

      generateDynamicBlacklist(filteredAdRealTimeLogDStream)def generateDynamicBlacklist(filteredAdRealTimeLogDStream: DStream[(Long, String)]) {// (timestamp province city userid adid)// 计算出每5个秒内的数据中,每天每个用户每个广告的点击量// 通过对原始实时日志的处理// 将日志的格式处理成<yyyyMMdd_userid_adid, 1L>格式val dailyUserAdClickDStream = filteredAdRealTimeLogDStream.map{ case (userid,log) =>// 从tuple中获取到每一条原始的实时日志val logSplited = log.split(" ")// 提取出日期(yyyyMMdd)、userid、adidval timestamp = logSplited(0)val date = new Date(timestamp.toLong)val datekey = DateUtils.formatDateKey(date)val userid = logSplited(3).toLongval adid = logSplited(4)// 拼接keyval key = datekey + "_" + userid + "_" + adid(key, 1L)}// 针对处理后的日志格式,执行reduceByKey算子即可,(每个batch中)每天每个用户对每个广告的点击量val dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey(_ + _)// 源源不断的,每个5s的batch中,当天每个用户对每支广告的点击次数// <yyyyMMdd_userid_adid, clickCount>dailyUserAdClickCountDStream.foreachRDD{ rdd =>rdd.foreachPartition{ items =>// 对每个分区的数据就去获取一次连接对象// 每次都是从连接池中获取,而不是每次都创建// 写数据库操作,性能已经提到最高了val adUserClickCounts = ArrayBuffer[AdUserClickCount]()for(item <- items){val keySplited = item._1.split("_")val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))// yyyy-MM-ddval userid = keySplited(1).toLongval adid = keySplited(2).toLongval clickCount = item._2//批量插入adUserClickCounts += AdUserClickCount(date, userid,adid,clickCount)}AdUserClickCountDAO.updateBatch(adUserClickCounts.toArray)}}// 现在我们在mysql里面,已经有了累计的每天各用户对各广告的点击量// 遍历每个batch中的所有记录,对每条记录都要去查询一下,这一天这个用户对这个广告的累计点击量是多少// 从mysql中查询// 查询出来的结果,如果是100,如果你发现某个用户某天对某个广告的点击量已经大于等于100了// 那么就判定这个用户就是黑名单用户,就写入mysql的表中,持久化val blacklistDStream = dailyUserAdClickCountDStream.filter{ case (key, count) =>val keySplited = key.split("_")// yyyyMMdd -> yyyy-MM-ddval date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))val userid = keySplited(1).toLongval adid = keySplited(2).toLong// 从mysql中查询指定日期指定用户对指定广告的点击量val clickCount = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)// 判断,如果点击量大于等于100,ok,那么不好意思,你就是黑名单用户// 那么就拉入黑名单,返回trueif(clickCount >= 100) {true}else{// 反之,如果点击量小于100的,那么就暂时不要管它了false}} 
    复制代码

2.4 计算广告点击流量实时统计结果(yyyyMMdd_province_city_adid,clickCount)

  • 转化为省城市粒度进行过滤,抛弃userId,格式为:(yyyyMMdd_province_city_adid,clickCount)

      val adRealTimeStatDStream = calculateRealTimeStat(filteredAdRealTimeLogDStream)def calculateRealTimeStat(filteredAdRealTimeLogDStream:DStream[(Long, String)]):DStream[(String, Long)] = {// 计算每天各省各城市各广告的点击量// 设计出来几个维度:日期、省份、城市、广告// 2015-12-01,当天,可以看到当天所有的实时数据(动态改变),比如江苏省南京市// 广告可以进行选择(广告主、广告名称、广告类型来筛选一个出来)// 拿着date、province、city、adid,去mysql中查询最新的数据// 等等,基于这几个维度,以及这份动态改变的数据,是可以实现比较灵活的广告点击流量查看的功能的// date province city userid adid// date_province_city_adid,作为key;1作为value// 通过spark,直接统计出来全局的点击次数,在spark集群中保留一份;在mysql中,也保留一份// 我们要对原始数据进行map,映射成<date_province_city_adid,1>格式// 然后呢,对上述格式的数据,执行updateStateByKey算子// spark streaming特有的一种算子,在spark集群内存中,维护一份key的全局状态//(userid, timestamp province city userid adid)val mappedDStream = filteredAdRealTimeLogDStream.map{ case (userid, log) =>val logSplited = log.split(" ")val timestamp = logSplited(0)val date = new Date(timestamp.toLong)val datekey = DateUtils.formatDateKey(date)val province = logSplited(1)val city = logSplited(2)val adid = logSplited(4).toLongval key = datekey + "_" + province + "_" + city + "_" + adid(key, 1L)}// 在这个dstream中,就相当于,有每个batch rdd累加的各个key(各天各省份各城市各广告的点击次数)// 每次计算出最新的值,就在aggregatedDStream中的每个batch rdd中反应出来val aggregatedDStream = mappedDStream.updateStateByKey[Long]{ (values:Seq[Long], old:Option[Long]) =>// 举例来说// 对于每个key,都会调用一次这个方法// 比如key是<20151201_Jiangsu_Nanjing_10001,1>,就会来调用一次这个方法7// 10个// values,(1,1,1,1,1,1,1,1,1,1)// 首先根据optional判断,之前这个key,是否有对应的状态var clickCount = 0L// 如果说,之前是存在这个状态的,那么就以之前的状态作为起点,进行值的累加if(old.isDefined) {clickCount = old.get}// values,代表了,batch rdd中,每个key对应的所有的值for(value <- values) {clickCount += value}Some(clickCount)}// 将计算出来的最新结果,同步一份到mysql中,以便于j2ee系统使用aggregatedDStream.foreachRDD{ rdd =>rdd.foreachPartition{ items =>//批量保存到数据库val adStats = ArrayBuffer[AdStat]()for(item <- items){val keySplited = item._1.split("_")val date = keySplited(0)val province = keySplited(1)val city = keySplited(2)val adid = keySplited(3).toLongval clickCount = item._2adStats += AdStat(date,province,city,adid,clickCount)}AdStatDAO.updateBatch(adStats.toArray)}}aggregatedDStream
    }
    复制代码

2.5 实时统计每天每个省份top3热门广告

  • 转化为省粒度进行过滤,抛弃userId ,cityid,格式为:(yyyyMMdd_province_adid,clickCount)

  • 注册成表,基于ROW_NUMBER()实现窗聚合,按照province分区,实现top3排序,

     tmp_daily_ad_click_count_by_provcalculateProvinceTop3Ad(spark,adRealTimeStatDStream)def calculateProvinceTop3Ad(spark:SparkSession, adRealTimeStatDStream:DStream[(String, Long)]) {// 每一个batch rdd,都代表了最新的全量的每天各省份各城市各广告的点击量//(yyyyMMdd_province_city_adid,clickCount)val rowsDStream = adRealTimeStatDStream.transform{ rdd =>// <yyyyMMdd_province_city_adid, clickCount>// <yyyyMMdd_province_adid, clickCount>// 计算出每天各省份各广告的点击量val mappedRDD = rdd.map{ case (keyString, count) =>val keySplited = keyString.split("_")val date = keySplited(0)val province = keySplited(1)val adid = keySplited(3).toLongval clickCount = countval key = date + "_" + province + "_" + adid(key, clickCount)}val dailyAdClickCountByProvinceRDD = mappedRDD.reduceByKey( _ + _ )// 将dailyAdClickCountByProvinceRDD转换为DataFrame// 注册为一张临时表// 使用Spark SQL,通过开窗函数,获取到各省份的top3热门广告val rowsRDD = dailyAdClickCountByProvinceRDD.map{ case (keyString, count) =>val keySplited = keyString.split("_")val datekey = keySplited(0)val province = keySplited(1)val adid = keySplited(2).toLongval clickCount = countval date = DateUtils.formatDate(DateUtils.parseDateKey(datekey))(date, province, adid, clickCount)}import spark.implicits._val dailyAdClickCountByProvinceDF = rowsRDD.toDF("date","province","ad_id","click_count")// 将dailyAdClickCountByProvinceDF,注册成一张临时表dailyAdClickCountByProvinceDF.createOrReplaceTempView("tmp_daily_ad_click_count_by_prov")// 使用Spark SQL执行SQL语句,配合开窗函数,统计出各身份top3热门的广告val provinceTop3AdDF = spark.sql("SELECT "+ "date,"+ "province,"+ "ad_id,"+ "click_count "+ "FROM ( "+ "SELECT "+ "date,"+ "province,"+ "ad_id,"+ "click_count,"+ "ROW_NUMBER() OVER(PARTITION BY province ORDER BY click_count DESC) rank "+ "FROM tmp_daily_ad_click_count_by_prov "+ ") t "+ "WHERE rank<=3")provinceTop3AdDF.rdd}// 每次都是刷新出来各个省份最热门的top3广告,将其中的数据批量更新到MySQL中rowsDStream.foreachRDD{ rdd =>rdd.foreachPartition{ items =>// 插入数据库val adProvinceTop3s = ArrayBuffer[AdProvinceTop3]()for (item <- items){val date = item.getString(0)val province = item.getString(1)val adid = item.getLong(2)val clickCount = item.getLong(3)adProvinceTop3s += AdProvinceTop3(date,province,adid,clickCount)}AdProvinceTop3DAO.updateBatch(adProvinceTop3s.toArray)}}
    复制代码

    }

2.6 实时统计每天每个广告在最近1小时的滑动窗口内的点击趋势(每分钟的点击量)

  • 版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq,如有任何技术交流,可随时联系。

  • 转化为时间粒度进行过滤,抛弃province,userId ,cityid,格式为: <yyyyMMddHHMM_adid,1L>,基于reduceByKeyAndWindow进行聚合

  • 最终结果展开 (date,hour,minute,adid,clickCount)

      calculateAdClickCountByWindow(adRealTimeValueDStream)def calculateAdClickCountByWindow(adRealTimeValueDStream:DStream[String]) {// 映射成<yyyyMMddHHMM_adid,1L>格式//(timestamp province city userid adid)val pairDStream = adRealTimeValueDStream.map{ case consumerRecord  =>val logSplited = consumerRecord.split(" ")val timeMinute = DateUtils.formatTimeMinute(new Date(logSplited(0).toLong))val adid = logSplited(4).toLong(timeMinute + "_" + adid, 1L)}// 计算窗口函数,1小时滑动窗口内的广告点击趋势val aggrRDD = pairDStream.reduceByKeyAndWindow((a:Long,b:Long) => (a + b),Minutes(60L), Seconds(10L))// 最近1小时内,各分钟的点击量,并保存到数据库aggrRDD.foreachRDD{ rdd =>rdd.foreachPartition{ items =>//保存到数据库val adClickTrends = ArrayBuffer[AdClickTrend]()for (item <- items){val keySplited = item._1.split("_")// yyyyMMddHHmmval dateMinute = keySplited(0)val adid = keySplited(1).toLongval clickCount = item._2val date = DateUtils.formatDate(DateUtils.parseDateKey(dateMinute.substring(0, 8)))val hour = dateMinute.substring(8, 10)val minute = dateMinute.substring(10)adClickTrends += AdClickTrend(date,hour,minute,adid,clickCount)}AdClickTrendDAO.updateBatch(adClickTrends.toArray)}}
    }
    复制代码

3 总结

温故而知新,本文为了综合复习,进行代码总结,内容粗鄙,勿怪

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq,如有任何技术交流,可随时联系。

秦凯新 于深圳

来源:掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

更多推荐

Spark综合使用及用户行为案例广告点击量实时统计分析实战

本文发布于:2024-02-14 14:02:04,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1763861.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:统计分析   实战   实时   案例   用户

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!