StructredStreaming+Kafka+Mysql(Spark实时计算

编程入门 行业动态 更新时间:2024-10-27 00:29:08

StructredStreaming+Kafka+Mysql(Spark<a href=https://www.elefans.com/category/jswz/34/1771422.html style=实时计算"/>

StructredStreaming+Kafka+Mysql(Spark实时计算

文章目录

  • 前言
  • 1、业务需求概述
  • 二 项目代码
    • 1.模拟交易数据
    • 2.创建Maven模块
      • 项目结构如下:
    • 3.核心代码
  • 总结


前言

每年天猫双十一购物节,都会有一块巨大的实时作战大屏,展现当前的销售情况。这种炫酷的页面背后,其实有着非常强大的技术支撑,而这种场景其实就是实时报表分析


1、业务需求概述

​ 模拟交易订单数据,发送至分布式消息队列Kafka,实时消费交易订单数据进行分析处理,业务流程图如下所示:


实时从Kafka消费交易订单数据,按照不同维度实时统计【销售订单额】,最终报表Report结果存储MySQL数据库;

二 项目代码

1.模拟交易数据

编写程序,实时产生交易订单数据,使用Json4J类库转换数据为JSON字符,发送Kafka Topic中,代码如下:

// =================================== 订单实体类 =================================
package cn.itcast.spark.mock/*** 订单实体类(Case Class)* @param orderId     订单ID* @param userId      用户ID* @param orderTime   订单日期时间* @param ip          下单IP地址* @param orderMoney  订单金额* @param orderStatus 订单状态*/
case class OrderRecord(orderId: String,userId: String,orderTime: String,ip: String,orderMoney: Double,orderStatus: Int)// ================================== 模拟订单数据 ==================================
package cn.itcast.spark.mockimport java.util.Propertiesimport org.apachemons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafkamon.serialization.StringSerializer
import org.json4s.jackson.Jsonimport scala.util.Random/*** 模拟生产订单数据,发送到Kafka Topic中*     Topic中每条数据Message类型为String,以JSON格式数据发送* 数据转换:*     将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库)*/
object MockOrderProducer {def main(args: Array[String]): Unit = {var producer: KafkaProducer[String, String] = nulltry {// 1. Kafka Client Producer 配置信息val props = new Properties()props.put("bootstrap.servers", "node1.itcast:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)// 2. 创建KafkaProducer对象,传入配置信息producer = new KafkaProducer[String, String](props)// 随机数实例对象val random: Random = new Random()// 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)while(true){// 每次循环 模拟产生的订单数目val batchNumber: Int = random.nextInt(1) + 5(1 to batchNumber).foreach{number =>val currentTime: Long = System.currentTimeMillis()val orderId: String = s"${getDate(currentTime)}%06d".format(number)val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000))val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS")val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100))val orderStatus: Int = allStatus(random.nextInt(allStatus.length))// 3. 订单记录数据val orderRecord: OrderRecord = OrderRecord(orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus)// 转换为JSON格式数据val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)println(orderJson)// 4. 构建ProducerRecord对象val record = new ProducerRecord[String, String]("orderTopic", orderJson)// 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topicproducer.send(record)}Thread.sleep(random.nextInt(100) + 500)}}catch {case e: Exception => e.printStackTrace()}finally {if(null != producer) producer.close()}}/**=================获取当前时间=================*/def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)val formatDate: String = fastFormat.format(time)  // 格式化日期formatDate}/**================= 获取随机IP地址 =================*/def getRandomIp: String = {// ip范围val range: Array[(Int, Int)] = Array((607649792,608174079), //36.56.0.0-36.63.255.255(1038614528,1039007743), //61.232.0.0-61.237.255.255(1783627776,1784676351), //106.80.0.0-106.95.255.255(2035023872,2035154943), //121.76.0.0-121.77.255.255(2078801920,2079064063), //123.232.0.0-123.235.255.255(-1950089216,-1948778497),//139.196.0.0-139.215.255.255(-1425539072,-1425014785),//171.8.0.0-171.15.255.255(-1236271104,-1235419137),//182.80.0.0-182.92.255.255(-770113536,-768606209),//210.25.0.0-210.47.255.255(-569376768,-564133889) //222.16.0.0-222.95.255.255)// 随机数:IP地址范围下标val random = new Random()val index = random.nextInt(10)val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)// 转换Int类型IP地址为IPv4格式number2IpString(ipNumber)}/**=================将Int类型IPv4地址转换为字符串类型=================*/def number2IpString(ip: Int): String = {val buffer: Array[Int] = new Array[Int](4)buffer(0) = (ip >> 24) & 0xffbuffer(1) = (ip >> 16) & 0xffbuffer(2) = (ip >> 8) & 0xffbuffer(3) = ip & 0xff// 返回IPv4地址buffer.mkString(".")}}

2.创建Maven模块

创建Maven模块,加入相关依赖,具体内如如下:

    <repositories><repository><id>aliyun</id><url>/</url></repository><repository><id>cloudera</id><url>/</url></repository><repository><id>jboss</id><url>;/url></repository></repositories><properties><scala.version>2.11.12</scala.version><scala.binary.version>2.11</scala.binary.version><spark.version>2.4.5</spark.version><hadoop.version>2.6.0-cdh5.16.2</hadoop.version><kafka.version>2.0.0</kafka.version><mysql.version>8.0.19</mysql.version></properties><dependencies><!-- 依赖Scala语言 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- Spark Core 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Spark SQL 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Structured Streaming + Kafka  依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Hadoop Client 依赖 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!-- Kafka Client 依赖 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency><!-- 根据ip转换为省市区 --><dependency><groupId>org.lionsoul</groupId><artifactId>ip2region</artifactId><version>1.7.2</version></dependency><!-- MySQL Client 依赖 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- JSON解析库:fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency></dependencies><build><outputDirectory>target/classes</outputDirectory><testOutputDirectory>target/test-classes</testOutputDirectory><resources><resource><directory>${project.basedir}/src/main/resources</directory></resource></resources><!-- Maven 编译的插件 --><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>

项目结构如下:

3.核心代码

RealTimeOrderReport.java

package cn.itcast.spark.reportimport java.util.concurrent.TimeUnitimport org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.expressions.{UserDefinedAggregateFunction, UserDefinedFunction}
import org.apache.spark.sql.types.{DataType, DataTypes}
import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}def printToConsole(dataFrame: DataFrame) = {dataFrame.writeStream.format("console").outputMode(OutputMode.Update()).option("numRows","50").option("truncate","false").start()}def main(args: Array[String]): Unit = {//1.获取spark实例对象val spark: SparkSession = SparkSession.builder().appName("isDemo").master("local[3]").config("spark.sql.shuffle.partitions", "3").getOrCreate()import spark.implicits._val dataFrame: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1.itcast:9092").option("subscribe", "orderTopic").load().selectExpr("CAST (value AS STRING)")
//		printToConsole(dataFrame)val ip_to_region: UserDefinedFunction = udf((ip: String) => {// 1. 创建DbSearch对象,指定数据字典文件位置val dbSearcher = new DbSearcher(new DbConfig(), "src/main/dataset/ip2region.db")// 2. 传递IP地址,解析获取数据val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)// 3. 获取解析省份和城市val region: String = dataBlock.getRegion//println(region)  // 中国|0|海南省|海口市|教育网val Array(_, _, pronvice, city, _) = region.split("\\|")(pronvice, city)
})val frame: DataFrame = dataFrame.select(get_json_object($"value", "$.ip").as("ip"),get_json_object($"value", "$.orderMoney").cast(DataTypes.createDecimalType(10, 2)).as("money"),get_json_object($"value", "$.orderStatus").as("status")).filter($"status" === 0).withColumn("region", ip_to_region($"ip")).select($"region._1".as("province"),$"region._2".as("city"),$"money")//				printToConsole(frame)//		/**
//			* 订单实体类(Case Class)
//			* @param orderId     订单ID
//			* @param userId      用户ID
//			* @param orderTime   订单日期时间
//			* @param ip          下单IP地址
//			* @param orderMoney  订单金额
//			* @param orderStatus 订单状态
//			*/
////		printToConsole(dframe)//SELECT "国家" as type, SUM(money) as totalMoney   FROM tmp_view//SELECT province as type, SUM(money) as totalMoney   FROM tmp_view GROUP BY province//SELECT city as type, SUM(money) as totalMoney   FROM (SELECT * FROM tmp_view WHERE city in ("北京市", "上海市", "深圳市", "广州市", "杭州市", "成都市", "南京市", "武汉市", "西安市"))t GROUP BY t.cityframe.createOrReplaceTempView("tmp_view")val f: DataFrame = spark.sql("""|SELECT "国家" as type, SUM(money) as totalMoney   FROM tmp_view""".stripMargin)val f2: DataFrame = spark.sql("""|SELECT province as type, SUM(money) as totalMoney   FROM tmp_view GROUP BY province""".stripMargin)val f3: DataFrame = spark.sql("""|SELECT city as type, SUM(money) as totalMoney   FROM (SELECT * FROM tmp_view WHERE city in ("北京市", "上海市", "深圳市", "广州市", "杭州市", "成都市", "南京市", "武汉市", "西安市"))t GROUP BY t.city""".stripMargin)
//		printToConsole(f3)saveToMySQL(f,"total")saveToMySQL(f2,"totalprovince")saveToMySQL(f3,"totalcity")spark.streams.awaitAnyTermination()}def saveToMySQL(streamDF:DataFrame,reportType:String)={streamDF.writeStream.outputMode(OutputMode.Complete()).queryName(s"${reportType}").foreachBatch((batchDF:DataFrame,batchId:Long)=>{batchDF.coalesce(1).write.mode(SaveMode.Overwrite).format("jdbc").option("url","jdbc:mysql://node1.itcast:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","123456").option("dbtable",s"db_spark.tb_order${reportType}").save()}).option("checkpointLocation", s"datas/spark/structured-ckpt-${System.currentTimeMillis()}").start()}
}

OrderRecord.scala

package cn.itcast.spark.mock/*** 订单实体类(Case Class)* @param orderId     订单ID* @param userId      用户ID* @param orderTime   订单日期时间* @param ip          下单IP地址* @param orderMoney  订单金额* @param orderStatus 订单状态*/
case class OrderRecord(orderId: String,userId: String,orderTime: String,ip: String,orderMoney: Double,orderStatus: Int)

总结

总结:
​ 实时报表分析是近年来很多公司采用的报表统计方案之一,其中最主要的应用就是实时大屏展示利用流式计算实时得出结果直接被推送到前端应用,实时显示出重要指标的变换情况。

​ 最典型的案例便是淘宝双十一活动,每年双十一购物节,除疯狂购物外,最引人注目的就是双十一大屏不停跳跃的成交总额。在整个计算链路中包括从天猫交易下单购买到数据采集,数据计算,数据校验,最终落到双十一大屏上展示的全链路时间压缩在5秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。

	这次的双十一实时报表分析实战主要用SQL编写,尚未用DSL编写,这是有待完善的地方.此次的天猫双十一实时报表分享就到这里,喜欢的小伙伴欢迎一键三连!!

更多推荐

StructredStreaming+Kafka+Mysql(Spark实时计算

本文发布于:2024-02-11 12:52:54,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1681101.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:实时   Kafka   StructredStreaming   Spark   Mysql

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!