admin管理员组文章数量:1584444
------此项目整理自《Spark Streaming 实时流式大数据处理实战》肖力涛
第8章 实时词频统计处理系统实战
原书源码地址:https://github/xlturing/spark-streaming-action/tree/master/code/第8章
本文源码地址:https://github/githubwaynefong/Projects_SparkStreaming/tree/master/实时词频统计系统
一、环境
开发环境:
系统:Win10
开发工具:scala-eclipse-IDE
项目管理工具:Maven 3.6.0
JDK 1.8
Scala 2.11.11
Spark (Streaming) 2.4.3
MySQL:mysql-connector-java-5.1.47
spark-streaming-kafka-0-8_2.11 (Spark Streaming 提供的Kafka集成接口)
注1. 末尾的2.11 代表scala版本;
注2. kafka-0-8 代表支持 kafka 0.8 及以上版本,此版本为老版本
注3. 集成接口使用方式,访问以下官方链接:
http://spark.apache/docs/latest/streaming-kafka-0-8-integration.html
Kafka_2.11-2.2.1 (2.11表示对应的scala版本,2.2.1表示kafka版本)
作业运行环境:
系统:Linux CentOS7(两台机:主从节点,2核)
master : 192.168.190.200
slave1 : 192.168.190.201
JDK 1.8
Scala 2.11.11
Spark 2.4.3
ZooKeeper 3.4.14
Kafka_2.11-2.2.1
MySQL 5.6.44 (位于master节点)
二、背景及系统设计图
参考背景:fsight舆情分析网站对游戏用户评论的词频统计功能 https://fsight.qq/Game/173#/outline
系统设计图:
三、项目结构图(Maven项目)
1. 模拟数据生成器项目
| 数据生成器 Producer:此处没用爬虫,重点回归到后面的流式词频统计上
| 数据消费测试 ConsumerTest
2. 分词服务
| 使用结巴分词,服务访问地址:"http://master:8282/token/" (位于master节点)
3. 流式词频统计项目
| dao层: 数据接入层模块(Kafka数据接入,MySQL数据接入)
| main层:程序主入口
| service层:具体的业务逻辑层(MySQL数据读出写入,分词服务)
| util层:放置工具类(配置信息,广播变量包装器,时间解析器)
四、代码实现
1. 模拟数据生成项目
1.1. 添加Kafka依赖:
<dependencies>
<dependency><!-- Kafka 依赖项 -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.2.1</version>
<exclusions><!-- 去掉引发冲突的包 -->
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>junit</artifactId>
<groupId>junit</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
1.2. 模拟数据生产者 Producer,并将消息发布到Kafka:
package sparkstreaming_action.kafka.producer
import scala.util.Random
import scala.io.Source
import java.util.Properties
import org.apache.kafkamon.serialization.StringSerializer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.clients.producer.ProducerConfig
// 用于生成模拟数据的生产者
object Producer extends App{
// 从命令行接收参数
val eventsNum = args(0).toInt // 评论事件数目
val topic = args(1) // 主题
val brokers = args(2) // 引导服务器列表
// 添加配置项
val props = new Properties()
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "kafkaDataProducer")
/** 注:不要用 classOf[StringSerializer].toString(),要用 .getName()
* toString() 输出:class org.apache.kafkamon.serialization.StringSerializer(多了开头class)
* getName() 输出:org.apache.kafkamon.serialization.StringSerializer
*/
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
// 构建Kafka生产者
val producer = new KafkaProducer[String, String](props)
// 开始生产时间
val startTime = System.currentTimeMillis()
val rnd = new Random()
// 读取汉字字典
val source = Source.fromFile("./hanzi.txt")
val lines = try source.mkString finally source.close()
for (nEvents <- Range(0, eventsNum)) {
// 生成模拟评论数据 (user, comment)
val sb = new StringBuilder()
// 随机从字典中抽取200个以内汉字拼在一起
for (index <- Range(0, rnd.nextInt(200))) {
sb += lines.charAt(rnd.nextInt(lines.length()))
}
// 构建用户(100个以内)
val userName = "user_" + rnd.nextInt(100)
// 构建生产者记录
val data = new ProducerRecord[String, String](topic, userName, sb.toString())
//异步向Kafka发送记录
producer.send(data, new Callback() {
//实现发送完成后的回调方法
override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
if(e != null) {
e.printStackTrace();
} else {
println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}
// 计算每条记录的平均发送时间
println("sent per second: " + (eventsNum * 1000 / (System.currentTimeMillis() - startTime)))
producer.close()
}
2. 分词服务项目
2.1. 中文分词器 segmentor.py:
| 服务访问地址:"http://master:8282/token/" (位于master节点)
# -*- coding:UTF-8 -*-
import jieba
cut = jieba.cut
from bottle import route, run
# 利用结巴分词的切词函数
def token(sentence):
seg_list = list(cut(sentence))
return " ".join(seg_list)
# 路由地址/token/
@route('/token/:sentence')
def index(sentence):
result = token(sentence)
# 返回json格式的结果
return "{\"ret\":0, \"msg\":\"OK\", \"terms\":\"%s\"}" % result
if __name__ == "__main__":
# 以master:8282启动服务
run(host="master", port=8282)
| 访问示例:(192.168.190.200 为 master 节点 IP)
—| 最终以 Json 格式返回,并将词语以空格分隔
3. 流式词频统计项目
3.1. 添加依赖项:
| Spark
| Spark Streaming
| Spark Streaming Kafka 0-8 => http://spark.apache/docs/latest/streaming-kafka-integration.html(官网教程)
| MySQL => 连接包:mysql-connector-java-5.1.47
| C3P0 连接池
| JSON => spray-json包 => 源码地址:https://github/spray/spray-json (含使用说明)
| HTTP => scalaj-http包 => 官网:https://index.scala-lang/scalaj/scalaj-http/scalaj-http(含使用说明)
| Time Parse => joda-time包 => 源码地址:https://github/JodaOrg/joda-time(含使用说明)
| Log 日志包
<properties>
<scala.version>2.11</scala.version><!-- 设置变量指定Scala版本号 -->
<spark.version>2.4.3</spark.version><!-- 设置变量指定Spark版本号 -->
</properties>
<dependencies>
<dependency><!-- Spark依赖包 -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope><!-- 运行时提供,打包不添加,Spark集群已自带 -->
</dependency>
<dependency><!-- Spark Streaming依赖包 -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope><!-- 运行时提供,打包不添加,Spark集群已自带 -->
</dependency>
<dependency><!-- Spark Streaming with Kafka 依赖包 -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId><!-- 使用旧版API -->
<version>${spark.version}</version>
</dependency>
<dependency><!-- MySQL 依赖包 -->
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency><!-- MySQL 连接池依赖包 -->
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
<dependency><!-- JSON 依赖包 -->
<groupId>io.spray</groupId>
<artifactId>spray-json_${scala.version}</artifactId>
<version>1.3.2</version>
</dependency>
<dependency><!-- HTTP 依赖包 -->
<groupId>org.scalaj</groupId>
<artifactId>scalaj-http_${scala.version}</artifactId>
<version>2.3.0</version>
</dependency>
<dependency><!-- Time Parse 时间解析依赖包 -->
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.4</version>
</dependency>
<dependency><!-- Log 日志依赖包 -->
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency><!-- 日志依赖接口 -->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>
</dependencies>
3.2. Kafka数据接入模块:
| Spark 流式作业作为消费者,订阅Kafka消息,使用低阶API直接读取数据流的方式获取数据(offsets用ZK保存)
| 创建直接数据流 DirectStream 前需要的准备:
—| 1)更新消费者在Kafka主题分区中的偏移量(offsets)至 Zookeeper
——| 1.1)如果之前消费过,需要判别此次获得的偏移是否过时,过时会使得消息被重复消费
——| 1.2)如果之前没有消费过,需要根据配置,设置为从头还是从尾开始消费
/** 创建数据流前,根据实际情况更新消费offsets
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
// 遍历topics
topics.foreach(topic => {
// 是否已被消费过
var hasConsumed = true
/** 从kafka获取指定主题下的所有分区
* 返回实例 Either[left, right],通常约定:left包装错误信息,right包装处理成功信息
* 1. 返回实例有两种:Left 和 Right ,都继承自 Either
* 2. 如果返回 Left 实例,则通过 父类 Either.isLeft 判断是否为Left实例,显然是 true,表示执行失败;
* 3. 如果返回 Right 实例,则通过 父类 Either.isLeft 判断是否为Left实例,显然是 false,表示执行成功;
* 此时可以通过 Either.right 来获取成功返回的信息。
*/
val partitionsE: Either[KafkaCluster.Err, Set[TopicAndPartition]]
= kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException("get kafka partition failed: "
+ s"${partitionsE.left.get.mkString("\n")}")
val partitions: Set[TopicAndPartition] = partitionsE.right.get
// 从kafka获取各分区上的消费者消费到的偏移offsets
val consumerOffsetsE: Either[KafkaCluster.Err, Map[TopicAndPartition, Long]]
= kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
hasConsumed = false // 没有消费过时,便获取不到消费偏移
log.info("consumerOffsetsE.isLeft: " + consumerOffsetsE.isLeft)
// 消费过的情况
if (hasConsumed) {
log.warn("消费过")
// 获取各分区起始位置偏移
val earliestLeaderOffsetsE
: Either[KafkaCluster.Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]]
= kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException("get earliest offsets failed: "
+ s"${earliestLeaderOffsetsE.left.get.mkString("\n")}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
/** 存放更新到起始位置的偏移offsets
* 可能只是存在部分分区consumerOffsets过时,
* 所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
* 默认定义不可变集合,插入新元素KV时会创建新的Map
*/
var offsets: Map[TopicAndPartition, Long] = Map()
/** 遍历消费偏移offsets集合
* 如果zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除,
* 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小。
* 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
* 这是把consumerOffsets更新为earliestLeaderOffsets
*/
consumerOffsets.foreach({
case (tp, n) => {
/** 获取消费分区tp上的起始偏移earliestLeaderOffset
* LeaderOffset数据结构:
* case class LeaderOffset(host: String, port: Int, offset: Long)
* TopicAndPartition数据结构:
* case class TopicAndPartition(topic: String, partition: Int)
*/
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
/** 如果消费偏移 小于 起始偏移,说明消费偏移已过时,需更新为起始偏移
* 否则,会重复消费消息
*/
if (n < earliestLeaderOffset) {
log.warn("consumer group:" + groupId + ",topic:" + tp.topic
+ ",partition:" + tp.partition + " offsets已经过时,更新为 "
+ earliestLeaderOffset)
// 加入需更新(已过世)的偏移offsets
offsets += (tp -> earliestLeaderOffset)
}
}
})
log.warn("offsets: " + offsets)
// 如果存在过时offsets,则更新到Kafka
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else { // 没有消费过的情况
log.warn("没有消费过")
/** 根据 auto.offset.reset 属性设置 更新偏移(头或尾)到Kafka
* 该属性指定:消费者在读取一个没有偏移量的分区或者偏移量无效的情况下
* (因为消费者长时间失效,包含偏移量的记录已经过时并被删除)的处理方式。
* 默认值:largest 从最新记录开始读取
* 另一个值:smallest 从起始位置读取分区记录
*/
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase())
var leaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = null
if (reset == Some("smallest")) {
leaderOffsets = kc.getEarliestLeaderOffsets(partitions).right.get
} else {
leaderOffsets = kc.getLatestLeaderOffsets(partitions).right.get
}
// 转换成指定格式
val offsets = leaderOffsets.map({
case (tp, n) => (tp, n.offset)
})
log.warn("offsets: " + offsets)
kc.setConsumerOffsets(groupId, offsets)
}
})
}
| 创建直接数据流 DirectStream:
/** 创建直接数据流
* @param ssc spark流式上下文
* @param kafkaParams kafka参数
* @param topic 主题
* @param K 键类型
* @param V 值类型
* @param KD 键反序列化类型
* @param VD 值反序列化类型
* @return 直接读取数据流
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String],
topics: Set[String]): InputDStream[(K, V)] = {
// 获取消费者组id
val groupId = kafkaParams.get("group.id").get
// 在ZK上读取offsets前,先根据实际情况更新offsets
setOrUpdateOffsets(topics, groupId)
//从ZK上读取offset开始消费message
val message: InputDStream[(K, V)] = {
// 获取主题下的分区
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException("get kafka partition failed: "
+ s"${partitionsE.left.get.mkString("\n")}")
val partitions = partitionsE.right.get
// 获取各分区上的消费偏移
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException("get kafka consumer offsets failed: "
+ s"${consumerOffsetsE.left.get.mkString("\n")}")
val consumerOffsets = consumerOffsetsE.right.get
// 创建直接读取数据流
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets,
(mmd: MessageAndMetadata[K, V]) => (mmd.key(), mmd.message()))
}
message
}
| 消息数据消费完后,需要将偏移量更新回ZooKeeper,以便下次消费从此处开始:
/** 消费成功后,更新ZK上的消费offsets
* @param rdd
*/
def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
val groupId = kafkaParams.get("group.id").get
/** 获取从Kafka DirectStream 中 RDD 的各分区偏移(offset)范围列表
* OffsetRange: (分区的offset 起始from--终止until)
*/
val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
// 向kafka发送成功消费的信号,并将读到的各分区的最新偏移(末尾偏移)更新回kafka
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
log.error(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
3.3. 分词服务模块:
| HTTP请求重试:
—| 分词服务访问使用HTTP请求,这可能会存在网络错误,导致访问失败
/** HTTP请求重试
* 出错原因:网络出错
* 尝试 n 次,若依然失败,则抛出异常
* @param n 尝试次数
* @param fn 执行函数体
* @return
*/
@annotation.tailrec
def retry[T](n: Int)(fn: => T): T = { // 使用了函数柯里化
util.Try { fn } match {
case util.Success(x) => x
case _ if n > 1 => {
log.warn(s"[retry ${n}]")
retry(n - 1)(fn)
}
case util.Failure(e) => {
log.error(s"[segError] API retry 3 times fail!", e)
throw e
}
}
}
| 使用HTTP请求,访问分词服务,解析返回的 JSON 结果:
/** 分词
* @param url: String 分词服务地址
* @param content: String 待分词内容
* @return HashSet[String] 词语集合
*/
def segment(url: String, content: String): HashSet[String] = {
// 请求开始时间
val timer = System.currentTimeMillis()
// 发送请求,等待响应
var response = Http(url + content).asString
// 计算响应时间(耗时时间)
val dur = System.currentTimeMillis() - timer
if (dur > 20) // 输出耗时较长的请求
log.warn(s"[longVisit]>>>>>> api: ${url}${content}\ttimer: ${dur}")
// 分词词语结果集
val words = HashSet[String]()
response.code match {
// 匹配响应成功信号
case 200 => {
// parseJson 将Json字符串转成Json语法树节点(Abstract Syntax Tree(AST) node)
// asJsObject 将Json AST 转成 Json对象,便于面向对象操作
response.body.parseJson.asJsObject.getFields("ret", "msg", "terms") match {
/** 匹配响应参数列表
* Seq() 默认实现了 List(广义表)
* val list = Seq(1,"a",3)
* println(list.head) //1
* println(list.tail) //List(a, 3)
*
* JsNumber/JsString 继承自 JsValue,包装名/值对
*/
case Seq(JsNumber(ret), JsString(msg), JsString(terms)) => {
if (ret.toInt != 0) { // 分词失败
log.error("[segmentRetError] visit api: "
+ s"${url}?content=${content}\tsegment error: ${msg}")
} else { // 分词成功
val tokens = terms.split(" ")
tokens.foreach(token => {
words += token // 词语插入集合
})
}
}
// 匹配失败,返回空集合
case _ => words
}
words
}
// 匹配响应异常信号
case _ => {
log.error("[segmentResponseError] visit api: "
+ s"${url}${content}\tresponse code: ${response.code}")
words
}
}
}
| 分词结果映射:
—| 按照指定词库,映射词库中词语在分词结果中的是否出现,若出现,以 (word, 1) 的格式记录,否则丢弃
/** 指定词 统计
* @param record: String 待分词记录
* @param wordDictionary: HashSet[String] 指定词语的词典
* @return
*/
def mapSegment(record: String, wordDictionary: HashSet[String]): Map[String, Int] = {
// 分词开始时间
val preTime = System.currentTimeMillis()
// 指定词 统计集合
val wordCount = Map[String, Int]()
if (record == null || record.isEmpty()) {
log.warn(s"record is empty.")
wordCount
} else {
// 分词服务地址
val postUrl = Conf.segmentorHost + "/token/"
try {
// 对记录分词
val wordsSet = retry(3)(segment(postUrl, record))
log.warn(s"[mapSegmentSuccess] record: ${record}\t"
+ s"time elapsed: ${System.currentTimeMillis() - preTime}")
// 按词库指定词 统计
wordDictionary.foreach(word => {
if (wordsSet.contains(word))
wordCount += word -> 1
})
wordCount
} catch {
case e: Exception =>
log.warn("[mapSegmentApiError] mapSegment error\t"
+ s"postUrl: ${postUrl}${record}", e)
wordCount
}
}
}
3.4 MySQL接入层
| 封装了 c3p0 连接池,用于减少频繁数据库连接的开销
/**
* Mysql连接池类(c3p0)
*/
class MysqlPool extends Serializable {
@transient lazy val log = LogManager.getLogger(this.getClass)
private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
private val conf = Conf.mysqlConfig
try {
cpds.setJdbcUrl(conf.get("url")
.getOrElse("jdbc:mysql://master:3306/spark?useUnicode=true&characterEncoding=UTF-8"))
cpds.setDriverClass("com.mysql.jdbc.Driver")
cpds.setUser(conf.get("username").getOrElse("hadoop"))
cpds.setPassword(conf.get("password").getOrElse("123456"))
cpds.setInitialPoolSize(3) // 初始连接数
cpds.setMaxPoolSize(Conf.maxPoolSize) // 连接池保留的最大连接数
cpds.setMinPoolSize(Conf.minPoolSize) // 连接池保留的最小连接数
cpds.setAcquireIncrement(5) // 连接数递增步长
cpds.setMaxStatements(180) // 最大缓存语句数
/** 最大空闲时间:
* 若25000秒内未使用则连接被丢弃;
* 若为0,则永久不丢弃。
* Default: 0
*/
cpds.setMaxIdleTime(25000)
// 检测连接查询(前提是表需要存在)
cpds.setPreferredTestQuery("select id from user_words limit 1")
//每18000秒(5h)检查连接池中的所有空闲连接,Default: 0
cpds.setIdleConnectionTestPeriod(18000)
} catch {
case e: Exception =>
log.error("[MysqlPoolError]", e)
}
// 获取连接
def getConnection: Connection = {
try {
return cpds.getConnection()
} catch {
case e: Exception =>
log.error("[MysqlPoolGetConnectionError]", e)
null
}
}
}
// 连接池单例
object MysqlManager {
@volatile private var mysqlPool: MysqlPool = _
def getMysqlPool: MysqlPool = {
if (mysqlPool == null) {
synchronized {
if (mysqlPool == null) {
mysqlPool = new MysqlPool
}
}
}
mysqlPool
}
}
3.5. MySQL服务层
| 加载指定用户词库 (表:user_words)
/** 加载用户词典
* @return HashSet[String]
*/
def getUserWords(): HashSet[String] = {
// 开始时间
val preTime = System.currentTimeMillis()
// sql查询
val sql = "select distinct(word) from user_words"
// 从连接池获取Mysql数据库连接
val conn = MysqlManager.getMysqlPool.getConnection
// 获取语句
val statement = conn.createStatement()
try {
// 执行查询,获取结果集(result set)
val rs = statement.executeQuery(sql)
// 存放结果词语
val words = HashSet[String]()
// 遍历结果集
while (rs.next()) {
words += rs.getString("word")
}
log.warn("[loadSuccess] load user_words from db "
+ s"count: ${words.size}\t"
+ s"time elapsed: ${System.currentTimeMillis() - preTime}")
words
} catch {
case e: Exception =>
log.error("[loadError] error: ", e)
null
} finally {
statement.close()
conn.close()
}
}
| 保存词频统计结果至MySQL数据库
—| 每个RDD分区记录批量执行一次数据库提交
/** 保存
* 按RDD分区批量执行插入语句
*/
def save(rdd: RDD[(String, Int)]): Unit = {
if (!rdd.isEmpty()) {
rdd.foreachPartition(partition => {
// 每个分区开始执行时间
val preTime = System.currentTimeMillis()
// 从连接池获取数据库连接
val conn = MysqlManager.getMysqlPool.getConnection
// 获取语句
val statement = conn.createStatement()
try {
conn.setAutoCommit(false) // 手动提交事务
partition.foreach((record: (String, Int)) => {
log.info(">>>>>>" + record)
// 创建时间
val createTime = System.currentTimeMillis()
/** 按 年月 创建分表word_count_yyyyMM
* 对 (word, date) 组合做唯一性校验
* 注1:scala中字符串换行书写时,需要把 "+" 号放在上一行的右侧
* 如:var sql = "abc" +
* "def"
* println(sql) // abcdef
* 注2:为了观看方便,写在左侧,但又不支持,
* 所以将字符串用括号套起来
* 如:var sql = ("abc"
* + "def")
* println(sql) // abcdef
*/
var sql = ("CREATE TABLE IF NOT EXISTS "
+ s"word_count_${TimeParse.timeStamp2String(createTime, "yyyyMM")} "
+ "(id int(11) NOT NULL AUTO_INCREMENT,"
+ " word varchar(64) NOT NULL,"
+ " count int(11) DEFAULT 0,"
+ " date date NOT NULL,"
+ " PRIMARY KEY (id),"
+ " UNIQUE KEY word (word, date)"
+ ") ENGINE=InnoDB DEFAULT CHARSET=utf8;")
statement.addBatch(sql)
/** 插入语句
* 对 (word, date) 组合出现重复(冲突)的插入,执行count值的累加更新
*/
sql = ("insert into "
+ s"word_count_${TimeParse.timeStamp2String(createTime, "yyyyMM")} "
+ "(word, count, date) values ("
+ s"'${record._1}', ${record._2},"
+ s"'${TimeParse.timeStamp2String(createTime, "yyyy-MM-dd")}'"
+ ") on duplicate key update count=count+values(count);")
statement.addBatch(sql)
log.warn(s"[recordAddBatchSuccess] record: ${record._1}, ${record._2}")
})
// 执行批次
statement.executeBatch()
// 提交批次事务
connmit()
// 每个分区的批次执行总时间
log.warn(s"[save_batchSaveSuccess] "
+ s"time elapsed: ${System.currentTimeMillis() - preTime}")
} catch {
case e: Exception =>
log.error("[save_batchSaveError]", e)
} finally {
statement.close()
conn.close()
}
})
}
}
3.6. 广播变量包装器(工具类):
| 用于广播用户词库到各个计算节点本地,减少频繁网络访问带来的开销
/**
* 广播变量包装器
* (支持运行时动态更新)
* @param ssc: StreamingContext 流式上下文
* @param _v: T 待广播数据
*/
case class BroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T) {
@transient private var v = ssc.sparkContext.broadcast(_v)
/** 更新广播变量
* @param newValue: T 新的待广播数据
* @param blocking: Boolean 是否阻塞广播变量的使用,直到广播变量重新广播完成
*/
def update(newValue: T, blocking: Boolean = false): Unit = {
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
// 广播变量的数据
def value: T = v.value
// 序列化广播变量对象
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
// 反序列化广播变量对象
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}
3.7. 程序入口:
| Spark Streaming 作业:(整合以上各个服务模块)
—| 1)作为Kafka消费者以直接数据流的方式消费消息数据
—| 2)利用分词服务解析消息,并按用户词库(广播词库,并周期更新)找出(映射)词是否出现(word,1)
—| 3)对解析结果 (word, 1) 执行聚合操作 reduceByKey,结果 => (word, count)
—| 4)输出最终聚合结果输出至MySQL数据库(支持对单个词语的累加计数)
—| 5)更新消息偏移至 ZK,告知此次消费成功(以便下次从此处开始消费)
/**
* 消费主程序
*/
object ConsumerMain extends Serializable {
@transient lazy val log = LogManager.getLogger(this.getClass)
def functionToCreateContext(): StreamingContext = {
// 设置Spark配置
val sparkConf = new SparkConf()
.setAppName("WordFreqConsumer")
.setMaster(Conf.master)
.set("spark.default.parallelism", Conf.parallelNum)
.set("spark.streaming.concurrentJobs", Conf.concurrentJobs)
.set("spark.executor.memory", Conf.executorMem)
.set("spark.cores.max", Conf.coresMax)
.set("spark.local.dir", Conf.localDir)
.set("spark.streaming.kafka.maxRatePerPartition", Conf.perMaxRate)
// 创建流式上下文
val ssc = new StreamingContext(sparkConf, Seconds(Conf.interval))
// ssc.checkpoint(Conf.localDir)
// 获取Kafka主题topics集合
val topics = Conf.topics.split(",").toSet
// 获取Kafka配置
val kafkaParams = Map[String, String](
"metadata.broker.list" -> Conf.brokers,
"auto.offset.reset" -> Conf.offsetReset,
"group.id" -> Conf.group)
// 创建Kafka数据管理层
val km = new KafkaManager(kafkaParams)
// 创建Kafka直接读取数据流:键值对格式 (元数据,消息)
val kafkaDirectStream = km.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)
log.warn(s"Initial Done***>>>topic:${Conf.topics}\t"
+ s"group:${Conf.group}\tlocalDir:${Conf.localDir}\t"
+ s"brokers:${Conf.brokers}")
// 缓存数据流
kafkaDirectStream.cache()
/** 加载词频统计词库
* 构建广播变量数据类型:
* (时间戳,词库) -- 时间戳用于定时更新时的时间计算
* (timestamp: Long, words: HashSet[String])
*/
val words = BroadcastWrapper[(Long, HashSet[String])](ssc,
(System.currentTimeMillis(), MysqlService.getUserWords()))
/** 对kafka每条消息进行分词操作
* 注:直接 kafkaDirectStream.flatMap 也能实现相同功能
* 不过定期更新词库的操作就需要在各个partition中(Worker)执行,
* 无法在Driver中执行,并且时间判断的频次也会增高
*/
val segmentedStream: DStream[(String, Int)] = kafkaDirectStream
.map(record => {
println("data: " + record)
record._2
})
.repartition(10)
.transform((rdd: RDD[String]) => { // 通过 transform 操作RDD
// Driver中执行
// 定期更新词库,更新完成前阻塞使用广播变量的进程
if (System.currentTimeMillis() - words.value._1 > Conf.updateFreq) {
words.update((System.currentTimeMillis(), MysqlService.getUserWords()), true)
log.warn("[BroadcastWrapper] words updated")
}
/** 对记录分词,并按词库统计
* RDD.flatMap[U](f: T => TraversableOnce[U]): RDD[U]
* 此处:T 为 String
* U 为 (String, Int)
* TraversableOnce 子类为 Map
*/
rdd.flatMap((record: String) => SegmentService.mapSegment(record, words.value._2))
})
// 按键(词)聚合,统计每个词的个数
val countedWordStream = segmentedStream.reduceByKey(_ + _)
// 将统计结果输出至MySQL数据库
countedWordStream.foreachRDD(MysqlService.save(_))
// 消费完一批消息(即:成功写入Mysql后),更新ZK中的 offsets
kafkaDirectStream.foreachRDD((rdd: RDD[(String, String)]) => {
if (!rdd.isEmpty()) {
km.updateZKOffsets(rdd)
}
})
ssc
}
/**
* 程序入口
*/
def main(args: Array[String]) {
val ssc = functionToCreateContext()
ssc.start()
ssc.awaitTermination()
}
}
五、启动相关服务,打包运行
--------------------------------------------------------------------------------------
**保证两台主机开启(master和slave1),然后启动如下软件:
master节点执行:
1. MySQL开机自启
2. 启动Spark:(master节点将开启Master守护进程,slave1节点将开启Worker守护进程)
$ /opt/spark-2.4.3-bin-hadoop2.7/sbin/start-all.sh
3. 启动分词服务:
$ python segmentor.py
Bottle v0.12.17 server starting up (using WSGIRefServer())...
Listening on http://master:8282/
Hit Ctrl-C to quit.
<非守护进程,不可退出,所以接下来的操作需要用另外的终端登陆master节点>
master和slave1节点执行:
1. 启动ZK:
$ zkServer.sh start
2. 启动Kafka:(-daemon 为隐藏启动日志,后面是启动的配置文件)
$ kafka-server-start.sh -daemon /opt/kafka_2.11-2.2.1/config/server.properties
--------------------------------------------------------------------------------------
**登陆master节点的MySQL数据库,初始化指定用户词库(此处以 百家姓 为例)
# 使用数据库模式 spark
USE spark;
# 用户词库
DROP TABLE IF EXISTS user_words;
CREATE TABLE IF NOT EXISTS user_words (
id bigint NOT NULL AUTO_INCREMENT,
word varchar(50) NOT NULL comment '统计关键词',
add_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '添加时间',
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
# 插入百家姓
insert into user_words (word) values
('赵'),('钱'),('孙'),('李'),('周'),('吴'),('郑'),('王'),
('冯'),('陈'),('褚'),('卫'),('蒋'),('沈'),('韩'),('杨'),
('朱'),('秦'),('尤'),('许'),('何'),('吕'),('施'),('张'),
('孔'),('曹'),('严'),('华'),('金'),('魏'),('陶'),('姜'),
('戚'),('谢'),('邹'),('喻'),('柏'),('水'),('窦'),('章'),
('云'),('苏'),('潘'),('葛'),('奚'),('范'),('彭'),('郎');
select * from user_words;
--------------------------------------------------------------------------------------
**作业打包提交开始运行:
在项目根目录(wordFreqKafkaMysql/)下打包:
$ mvn clean install
在 wordFreqKafkaMysql/target/ 下生成Jar包:wordFreqKafkaMysql-0.1-jar-with-dependencies.jar
将此Jar包上传至master节点的目录下
在项目根目录(kafkaDataProducer/)下打包:
$ mvn clean install
在 kafkaDataProducer/target/ 下生成Jar包:kafkaDataProducer-0.1-jar-with-dependencies.jar
将此Jar包上传至master节点的目录下
终端A 登陆 master节点执行(可用多个 Power Shell 连接)
1. 提交Spark Streaming作业:(master节点)
$ spark-submit --class sparkstreaming_action.wordfreq.main.ConsumerMain --num-executors 2 --conf spark.default.parallelism=1000 wordFreqKafkaMysql-0.1-jar-with-dependencies.jar
<会有如下输出:每5min中更新指定用户词库>
19/06/28 21:15:56 WARN KafkaManager: 消费过
19/06/28 21:15:56 WARN KafkaManager: offsets: Map()
19/06/28 21:15:56 WARN ConsumerMain$: Initial Done***>>>topic:test group:wordFreqGroup localDir:./tmp brokers:master:9092,slave1:9092
19/06/28 21:15:56 WARN MysqlService$: [loadSuccess] load user_words from db count: 48 time elapsed: 508
19/06/28 21:20:57 WARN MysqlService$: [loadSuccess] load user_words from db count: 48 time elapsed: 4
19/06/28 21:20:57 WARN ConsumerMain$: [BroadcastWrapper] words updated
终端B 登陆 master节点执行(可用多个 Power Shell 连接)
1. 启动模拟数据生成器:
$ java -cp kafkaDataProducer-0.1-jar-with-dependencies.jar sparkstreaming_action.kafka.producer.Producer 10000 test master:9092,slave1:9092
参数1:10000 代表用户评论数,多一点可以增加命中指定词库词语的概率
之前设置的太小,都无法命中,以至于无数据输出至MySQL
参数2:test 代表主题
参数3:master:9092,slave1:9092 代表Kafka的brokers(引导服务列表)
2. 启动模拟数据消费者测试:(可不运行,只是测试生产者数据的)
$ java -cp kafkaDataProducer-0.1-jar-with-dependencies.jar sparkstreaming_action.kafka.producer.Consumer master:9092,slave1:9092 testgroup test
3. 登陆 MySQL 查看是否有 word_count_yyyyMM 格式的分表创建
以下是查询结果,表示词库词汇出现在模拟数据中的次数
<至此,整个流程运行成功!>
mysql> select * from spark.word_count_201906;
+-----+------+-------+------------+
| id | word | count | date |
+-----+------+-------+------------+
| 1 | 吕 | 7 | 2019-06-28 |
| 2 | 冯 | 3 | 2019-06-28 |
| 3 | 沈 | 6 | 2019-06-28 |
| 4 | 李 | 10 | 2019-06-28 |
| 5 | 彭 | 13 | 2019-06-28 |
| 6 | 孔 | 5 | 2019-06-28 |
| 7 | 谢 | 3 | 2019-06-28 |
| 8 | 钱 | 31 | 2019-06-28 |
| 9 | 金 | 6 | 2019-06-28 |
| 10 | 朱 | 11 | 2019-06-28 |
| 12 | 郎 | 5 | 2019-06-28 |
| 13 | 陶 | 3 | 2019-06-28 |
| 14 | 章 | 5 | 2019-06-28 |
| 15 | 孙 | 4 | 2019-06-28 |
| 16 | 施 | 2 | 2019-06-28 |
| 17 | 赵 | 4 | 2019-06-28 |
| 18 | 魏 | 9 | 2019-06-28 |
| 19 | 秦 | 6 | 2019-06-28 |
| 21 | 杨 | 9 | 2019-06-28 |
| 22 | 云 | 5 | 2019-06-28 |
| 23 | 苏 | 5 | 2019-06-28 |
| 25 | 陈 | 7 | 2019-06-28 |
| 27 | 韩 | 10 | 2019-06-28 |
| 41 | 曹 | 5 | 2019-06-28 |
| 42 | 张 | 4 | 2019-06-28 |
| 49 | 郑 | 5 | 2019-06-28 |
| 50 | 潘 | 3 | 2019-06-28 |
| 53 | 蒋 | 3 | 2019-06-28 |
| 56 | 尤 | 7 | 2019-06-28 |
| 57 | 王 | 1 | 2019-06-28 |
| 58 | 严 | 3 | 2019-06-28 |
| 61 | 何 | 1 | 2019-06-28 |
| 71 | 许 | 2 | 2019-06-28 |
| 81 | 邹 | 5 | 2019-06-28 |
| 88 | 周 | 1 | 2019-06-28 |
| 93 | 葛 | 3 | 2019-06-28 |
| 101 | 戚 | 1 | 2019-06-28 |
| 102 | 水 | 4 | 2019-06-28 |
| 103 | 姜 | 5 | 2019-06-28 |
| 123 | 吴 | 5 | 2019-06-28 |
| 212 | 柏 | 1 | 2019-06-28 |
+-----+------+-------+------------+
六、Spark UI
1. 通过Spark UI 可以查看 Streaming 作业的运行情况:
2. 通过Spark UI 可以查看 Executors 的运行情况:
七、参考文章
1.《Spark Streaming 实时流式大数据处理实战》第8章 实时词频统计处理系统实战
2.《Kafka 权威指南》第4章 Kafka消费者
3. Spark 配置信息详解 configuration.html(Spark官网)
4.【源码追踪】SparkStreaming 中用 Direct 方式每次从 Kafka 拉取多少条数据(offset取值范围)
5. kafka之consumer参数auto.offset.reset 0.10+
6. c3p0连接池
版权声明:本文标题:《Spark实时词频统计处理系统》 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/dongtai/1726761466a1083286.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论