大数据高级开发工程师——Spark学习笔记(10)

编程入门 行业动态 更新时间:2024-10-26 12:34:51

大数据高级开发工程师——Spark<a href=https://www.elefans.com/category/jswz/34/1770117.html style=学习笔记(10)"/>

大数据高级开发工程师——Spark学习笔记(10)

文章目录

  • Spark内存计算框架
    • Spark Streaming
      • Checkpoint
        • 1. checkpoint的基本介绍
        • 2. 什么时候需要使用checkpoint
        • 3. 如何使用checkpoint
      • Spark Streaming和Spark SQL整合
      • Spark Streaming容错
        • 1. 节点失败容错
        • 2. 数据丢失如何处理
        • 3. Task执行很慢容错
      • 优雅关闭
      • Spark Streaming整合kafka
        • 1. Spark Streaming整合kafka-0-8
          • 方式一:Receiver-based Approach【不推荐使用】
          • 方式二:Direct Approach(NoReceivers)
        • 2. Spark Streaming与kafka-0-10整合
        • 3. 解决SparkStreaming与kafka-0.8版本正好数据不丢失方案
        • 4. Spark Streaming如何保证exactly-once

Spark内存计算框架

Spark Streaming

Checkpoint

1. checkpoint的基本介绍

  • checkpoint 是 SparkStreaming 当中为了解决流式处理程序意外停止造成的数据丢失问题,checkpoint 的目的是保证长时间运行的任务在意外挂掉之后,能够在拉起来时不丢失数据。
  • checkpoint 中包含两种数据:
    • metadata 元数据信息:用户恢复 Driver 端的数据
      • 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统,用来恢复 Driver,元数据包括:配置(用户创建该 Streaming Application的所有配置)、DStream(一系列的操作)、未完成的batches(那些提交了 job 但尚未执行或未完成的batches)
    • data:保存已生成的 RDDs 至可靠的存储;这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖随时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链。

2. 什么时候需要使用checkpoint

  • 满足以下任一条件:
    • 使用了 stateful 转换:如果 Application 中使用了 updateStateByKey 或 reduceByKeyAndWindow 等 stateful 操作,必须提供 checkpoint 目录来允许定时的 RDD 进行 checkpoint。
    • 希望从意外中恢复 Driver
  • 如果 Streaming App 没有 stateful 操作,也允许 Driver 挂掉之后再次重启的进度丢失,就没有启用 checkpoint 的必要了。

3. 如何使用checkpoint

  • 启用 checkpoint,需要设置一个支持容错的、可靠的文件系统(如HDFS、S3等)目录来保存 checkpoint 数据。
  • 通过调用 streamingContext.checkpoint(checkpointDirectory) 来完成,另外,如果你想让你的 Application 能从 Driver 失败中恢复,你的 Application 要满足:
    • 若 Application 为首次重启,将创建一个新的 StreamContext 实例;
    • 若 Application 是从失败中重启,将会从 checkpoint 目录导入 checkpoint 数据来重新创建 StreamingContext 实例;
    • 通过 StreamingContext.getOrCreate 可以达到目的。
  • checkpoint 不仅仅可以保存运行结果中的数据,还可以存储 Driver 端的信息,通过 checkpoint 可以实现 Driver 端的高可用。代码实现如下:
object Case09_DriverHAWordCount {val checkpointPath = "hdfs://node01:8020/checkpoint"def creatingFunc(): StreamingContext = {Logger.getLogger("org").setLevel(Level.ERROR)val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(2))ssc.checkpoint(checkpointPath)val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc)result.print()ssc}/*** @param currentValues  表示在当前批次每个单词出现的所有的1 (hadoop,1)、(hadoop,1)、...、(hadoop,1)* @param historyValues  表示在之前所有批次中每个单词出现的总次数 (hadoop,100)*/def updateFunc(currentValues: Seq[Int], historyValues: Option[Int]): Option[Int] = {val newValue: Int = currentValues.sum + historyValues.getOrElse(0)Some(newValue)}def main(args: Array[String]): Unit = {val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointPath, creatingFunc _)ssc.start()ssc.awaitTermination()}
}
  • 如果 checkpointDirectory 存在,那么 context 将导入 checkpoint 数据;如果目录不存在,函数 functionToCreateContext 将被调用并创建新的 context。
  • 除了调用 getOrCreate 外,还需要你的集群模式执行 Driver 挂掉之后重启之。
    • 例如,在 yarn 模式下,Driver 是运行在 ApplicationMaster 中,若 ApplicationMaster 挂掉,yarn 会自动在另一个节点启动一个新的 ApplicationMaster。
  • 需要注意的是,随着 Streaming Application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置 checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的 5~10 倍。

Spark Streaming和Spark SQL整合

  • pom.xml 文件添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.version}</artifactId><version>${spark.version}</version>
</dependency>
  • 代码开发:
object Case10_WordCountStreamingAndSql {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)// 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")// 2. 创建StreamingContext对象val ssc = new StreamingContext(sparkConf, Seconds(1))// 3. 接收Socket数据val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)// 4. 对数据进行处理val words: DStream[String] = socketTextStream.flatMap(_.split(" "))// 5. 对DStream进行处理,将RDD转换成DataFramewords.foreachRDD(rdd => {// 获取 SparkSessionval sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()import sparkSession.implicits._val dataFrame: DataFrame = rdd.toDF("word")// 将dataFrame注册成表dataFrame.createOrReplaceTempView("words")// 统计每个单词出现的次数val result: DataFrame = sparkSession.sql("select word, count(*) as count from words group by word")// 展示结果result.show()})// 6. 开启流式计算ssc.start()ssc.awaitTermination()}
}

Spark Streaming容错

1. 节点失败容错

  • SparkStreaming运行流程:

  • 当一个 Executor 失败时:会将 task 重新发送到备份的数据块所在的 Executor
    • Tasks和Receiver自动的重启,不需要做任何的配置

  • 当 Driver 失败时:使用 checkpoint 机制恢复失败的 Driver

  • 使用 checkpoint 机制,会定期将 Driver 信息写入到 HDFS 中

  • 步骤一:设置自动重启 Driver 程序
# Standalone: 在spark-submit提交任务时,增加两个参数 `--deploy-mode cluster` 和 `--supervise`
spark-submit \
--master spark://node01:7077 \
--deploy-mode cluster \
--supervise \
--class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
--executor-memory 1g \
--total-executor-cores 2 \
original-spark-demo-1.0.jar# Yarn: 在spark-submit提交任务时,增加参数 `--deploy-mode cluster`,并设置 `yarn.resourcemanager.am.max-attemps`
spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
--executor-memory 1g \
--total-executor-cores 2 \
original-spark-demo-1.0.jar<property><name>yarn.resourcemanager.am.max-attempts</name><value>4</value><description>The maximum number of application master execution attempts.</description>
</property>
  • 步骤二:设置 HDFS 的 checkpoint 目录
streamingContext.checkpoint(hdfsDirectory) 
  • 步骤三:代码实现
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {val ssc = new StreamingContext(...)   // new contextval lines = ssc.socketTextStream(...) // create DStreams...ssc.checkpoint(checkpointDirectory)   // set checkpoint directoryssc
}// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...// Start the context
context.start()
context.awaitTermination()

2. 数据丢失如何处理

  • 可以利用 WAL 机制,将数据写入到 HDFS 中,这样当发生节点宕机时,可以从 WAL 中恢复

  • 步骤一:设置 checkpoint 目录
streamingContext.checkpoint(hdfsDirectory)
  • 步骤二:开启 WAL 日志
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
  • 步骤三:需要 reliable receiver
    • 当数据写完 WAL 后,才告诉数据源已经消费,对于没有告诉数据的数据,可以从数据源中重新消费数据。
  • 步骤四:取消备份
    • 使用 StorageLevel.MEMORY_AND_DISK_SER 来存储数据源,不需要后缀为 2 的策略,因为 HDFS 已经是多副本了。
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999, StorageLevel.MEMORY_AND_DISK_SER)

  • Reliable Receiver:当数据接收到,并且已经备份存储后,再发送回执给数据源;
  • Unreliable Receiver:不发送回执给数据源
  • WAL:使用在文件系统和数据库中用于数据操作的持久性,先把数据写到一个持久化的日志中,然后对数据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。
    • 对于像kafka和flume这些使用接收器来接收数据的数据源。接收器作为一个长时间的任务运行在executor中,负责从数据源接收数据,如果数据源支持的话,向数据源确认接收到数据,然后把数据存储在executor的内存中,然后在exector上运行任务处理这些数据。
    • 如果wal启用了,所有接收到的数据会保存到一个日志文件中去(HDFS), 这样保存接收数据的持久性。
    • 此外,如果只有在数据写入到log中之后接收器才向数据源确认,这样driver重启后那些保存在内存中但是没有写入到log中的数据将会重新发送,这两点保证的数据的无丢失。

3. Task执行很慢容错

  • 开启推测机制:假设总的 task 有 10 个,成功的 task 数量 > spark.speculation.quantile * 10,正在运行的 task 的运行时间 > spark.speculation.multoplier * 成功运行task的平均时间,则这个正在运行的 task 需要重新等待调度。
# 每隔一段时间来检查有哪些正在运行的 task 需要重新调度
spark.speculation = true
# 推测间隔时间
spark.speculation.interval = 100ms
# 推测数量阈值
spark.speculation.quantile = 0.75
spark.speculation.multoplier = 1.5

  • 在分布式环境中,导致某个 task 执行缓慢的情况有很多:负载不均、程序 bug、资源不均、数据倾斜等,而且这些情况在分布式计算环境中是常态。Speculative Task(推测Task) 这种以空间换时间的思路设计对计算资源是种压榨,同时,如果 Speculative Task 本身也变成了 Slow Task 会导致情况进一步恶化。

优雅关闭

  • 流式任务需要 7 * 24h 执行,但有时涉及到代码升级,需要主动停止程序。但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅关闭就显得至关重要了。
  • 使用外部文件系统来控制内部程序关闭,代码实现如下:
object Case11_GracefullyShutdown {private val HDFS: String = "hdfs://node01:8020"private val CHECKPOINT: String = HDFS + "/checkpoint"def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val ssc: StreamingContext = StreamingContext.getActiveOrCreate(CHECKPOINT, () => createSsc())new Thread(new MonitorStop(ssc)).start()ssc.start()ssc.awaitTermination()}def createSsc(): _root_.org.apache.spark.streaming.StreamingContext = {val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {Some(values.sum + status.getOrElse(0))}val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)// 设置优雅关闭sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")val ssc = new StreamingContext(sparkConf, Seconds(5))ssc.checkpoint(CHECKPOINT)val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)val wordAndCount: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc)wordAndCount.print()ssc}class MonitorStop(ssc: StreamingContext) extends Runnable {override def run(): Unit = {val fs: FileSystem = FileSystem.get(new URI(HDFS), new Configuration(), "hadoop")while (true) {try {TimeUnit.SECONDS.sleep(5)} catch {case e: InterruptedException => e.printStackTrace()}val state: StreamingContextState = ssc.getState()val bool: Boolean = fs.exists(new Path(HDFS + "/stopSpark"))if (bool && state == StreamingContextState.ACTIVE) {ssc.stop(stopSparkContext = true, stopGracefully = true)System.exit(0)}}}}
}

Spark Streaming整合kafka

  • 在消费 kafka 数据时,可以有三种语义保证:
    • at most one 至多一次:数据最多处理一次或没有被处理,有可能造成数据丢失的情况;
    • at least once 至少一次:数据最少被处理一次,有可能存在重复消费的问题;
    • exactly once 精准一次:数据消费一次且仅一次

1. Spark Streaming整合kafka-0-8

  • SparkStreaming整合Kafka官方文档

方式一:Receiver-based Approach【不推荐使用】
  • 此方法使用Receiver接收数据,Receiver是使用Kafka高级消费者API实现的。
  • 与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。
  • 但是在默认配置下,此方法可能会在失败时丢失数据(请参阅接收器可靠性。)
  • 为确保零数据丢失,必须在Spark Streaming中另外启用Write Ahead Logs(WAL 在Spark 1.2中引入)。
  • 这将同步保存所有收到的Kafka数据写入分布式文件系统(例如HDFS)上的预写日志,以便在发生故障时可以恢复所有数据,但是性能不好。
  • pom.xml 添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.4.8</version>
</dependency>
  • 核心代码:
import org.apache.spark.streaming.kafka._val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
  • 代码示例:
/*** sparkStreaming使用kafka 0.8API基于recevier来接受消息*/
object Case12_KafkaReceiver08 {private val zkQuorum = "192.168.254.120:2181"private val groupId = "KafkaReceiver08"private val topics = Map("test" -> 1)def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")// 开启 WAL 机制.set("spark.streaming.receiver.writeAheadLog.enable", "true")val ssc = new StreamingContext(sparkConf, Seconds(2))// 设置checkpoint,将接收到的数据持久化写入到HDFSssc.checkpoint("hdfs://node01:8020/wal")// 接收kafka数据val receiverDstream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)// 获取kafka的topic数据val data: DStream[String] = receiverDstream.map(_._2)// 单词计算val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)result.print()ssc.start()ssc.awaitTermination()}
}
  • 创建 kafka 的 topic 并发送数据
# 创建topic
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper 192.168.254.120:2181
# 生产发送数据
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

如果程序运行过程中,出现错误java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V这是spark-core与kafka-client中lz4版本不一致导致的,可用以下方式在程序中指定其他的压缩算法进行解决
new SparkConf().set(“spark.io.compression.codec”, “snappy”)

方式二:Direct Approach(NoReceivers)
  • 这种新的不基于Receiver的,是直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。
  • 替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。
  • 当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
  • 这种方式有如下优点:
    • 简化并行读取:
      • 如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。
      • Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。
      • 所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
    • 高性能:
      • 如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下。
      • 因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。
      • 而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
    • 一次且仅一次的事务机制:
      • 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。
      • 这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性。
      • 但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
    • 降低资源:
      • Direct不需要Receivers,其申请的Executors全部参与到计算任务中。
      • 而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。
      • 因此相同的资源申请,Direct 能够支持更大的业务。
    • 降低内存:
      • Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据。
      • 对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。
      • 而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
      • 实际应用中我们可以把原先的10G降至现在的2-4G左右。
    • 鲁棒性更好:
      • Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。
      • Direct 则没有这种顾虑,其Driver在触发batch计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
  • 代码示例:
/*** sparkStreaming使用kafka 0.8API基于Direct直连来接受消息* spark direct API接收kafka消息,从而不需要经过zookeeper,直接从broker上获取信息。*/
object Case13_KafkaDirect08 {private val kafkaCluster = "node01:9092,node02:9092,node03:9092"private val groupId = "KafkaDirect08"private val topics = Set("test")def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")// 开启 WAL 机制.set("spark.streaming.receiver.writeAheadLog.enable", "true")val ssc = new StreamingContext(sparkConf, Seconds(2))// 接收kafka数据val kafkaParams = Map("metadata.broker.list" -> kafkaCluster,"group.id" -> groupId)// 使用direct直连的方式接收数据val kafkaDstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)// 获取kafka的topic数据val data: DStream[String] = kafkaDstream.map(_._2)// 单词计算val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)result.print()ssc.start()ssc.awaitTermination()}
}
  • 要想保证数据不丢失,最简单的就是靠checkpoint的机制;但是checkpoint机制有个特点,如果代码升级了,checkpoint机制就失效了。
  • 所以如果想实现数据不丢失,那么就需要自己管理offset。

2. Spark Streaming与kafka-0-10整合

  • 支持0.10版本,或者更高的版本【推荐使用这个版本】
  • pom.xml 添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId><version>${spark.version}</version>
</dependency>
  • 代码示例:
/*** sparkStreaming使用kafka 1.0API基于Direct直连来接受消息*/
object Case14_KafkaDirect10 {private val kafkaCluster = "node01:9092,node02:9092,node03:9092"private val groupId = "KafkaDirect10"private val topics = Set("test")def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)// 1. 创建StreamingContext对象val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(2))// 2. 使用Direct接收kafka数据val kafkaParams = Map("bootstrap.servers" -> kafkaCluster,"group.id" -> groupId,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"enable.auto.commit" -> "false")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,// 数据本地性策略LocationStrategies.PreferConsistent,// 指定要订阅的topicConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 3. 对数据进行处理// 注意:如果你想获取到消息消费的偏移,这里需要拿到最开始的这个DStream进行操作// 如果你对该DStream进行了其他的转换之后,生成了新的DStream,新的DStream不再保存对应的消息的偏移量kafkaDStream.foreachRDD(rdd => {// 获取消息内容val dataRdd: RDD[String] = rdd.map(_.value())// 打印dataRdd.foreach(line => println(line))// 4. 提交偏移量,将偏移量信息添加到kafka中val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangeskafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})// 5. 开启流式计算ssc.start()ssc.awaitTermination()}
}

3. 解决SparkStreaming与kafka-0.8版本正好数据不丢失方案

  • 一般企业来说无论你是使用哪一套api去消费kafka中的数据,都是设置手动提交偏移量。因为自动提交(默认60s提交一次)偏移量风险比较高,可能会出现数据丢失或者数据被重复处理:
    • 数据处理失败了,自动提交了偏移量,会出现数据的丢失;
    • 数据处理成功了,自动提交偏移量失败,之后消费时会从失败的位置再次消费,导致数据重复处理。
  • 一般来说就手动去提交偏移量,将偏移量的提交通过消费者程序自己去维护,示意图如下:

  • 代码示例,偏移量存入ZK:
/*** sparkStreaming使用kafka 0.8API基于Direct直连来接受消息* 手动将偏移量数据保存到ZK中*/
object Case15_KafkaManageOffset08 {private val kafkaCluster = "node01:9092,node02:9092,node03:9092"private val zkQuorum = "192.168.254.120:2181"private val groupId = "consumer-manager"private val topic = "wordcount"private val topics = Set(topic)def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")// 开启 WAL 机制.set("spark.streaming.receiver.writeAheadLog.enable", "true")val ssc = new StreamingContext(sparkConf, Seconds(2))// 创建一个 ZKGroupTopicDirs 对象,就是用来指定在zk中的存储目录,用来保存数据偏移量val topicDirs = new ZKGroupTopicDirs(groupId, topic)// 获取 ZK 中的路径 "/consumers/consumer-manager/offsets/wordcount"val zkTopicPath = topicDirs.consumerOffsetDir// 构造一个ZK的客户端,用来读写偏移量数据val zkClient = new ZkClient(zkQuorum)// 准备kafka的参数val kafkaParams = Map("metadata.broker.list" -> kafkaCluster,"group.id" -> groupId,"enable.auto.commit" -> "false")// 定义kafkaStream流var kafkaStream: InputDStream[(String, String)] = null// 获取指定的zk节点的子节点个数val childrenNum = zkClient.countChildren(zkTopicPath)// 判断是否保存过数据: 根据子节点的数量是否为0if (childrenNum > 0) {var fromOffsets: Map[TopicAndPartition, Long] = Map()for (i <- 0 until childrenNum) {// 获取子节点val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/$i")val tp = TopicAndPartition(topic, i)// 获取数据偏移量: 将不同分区内的数据偏移量保存到map集合中fromOffsets += (tp -> partitionOffset.toLong)}// 泛型中 key, kafka中的key   value:hello tom hello jerry// 创建函数 解析数据 转换为(topic_name, message)的元组val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())// 利用底层的API创建DStream: 采用直连的方式(若之前已经消费了,则从指定的位置消费)kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)} else {// 利用底层的API创建DStream 采用直连的方式(若之前没有消费,则这是第一次读取数据)// zk中没有子节点数据,就是第一次读取数据,直接创建直连对象kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)}// 直接操作kafkaStream// 依次迭代DStream中的kafkaRDD, 只有kafkaRDD才可以强转为HasOffsetRanges, 从中获取数据偏移量信息// 之后是操作的RDD, 不能够直接操作DStream, 因为调用Transformation方法之后就不是kafkaRDD了获取不了偏移量信息kafkaStream.foreachRDD(kafkaRDD => {// 强转为HasOffsetRanges, 获取offset偏移量数据val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges// 获取数据val lines: RDD[String] = kafkaRDD.map(_._2)// 接下来就是对RDD进行操作 触发actionlines.foreachPartition(partition => partition.foreach(x => println(x)))// 手动提交偏移量到zk集群上for (o <- offsetRanges) {// 拼接zk路径val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"// 将 partition 的偏移量数据 offset 保存到zookeeper中ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)}})ssc.start()ssc.awaitTermination()}
}

4. Spark Streaming如何保证exactly-once

  • 一个流式计算如果要保证 exactly-once,首先要有三点要求:
    • source 支持 reply;
    • 流计算引擎本身处理能保证 exactly-once;
    • sink支持幂等或事务更新
  • 实现数据被处理且仅被处理一次,就需要实现数据结果保存操作与偏移量保存操作再同一个事务中,或者实现幂等操作。
  • 也就是说如果想让 SparkStreaming 的程序保证 exactly-once,需要从以下三个角度出发:
    • 接收数据:从Source中接收数据,保证 exactly-once;
    • 转换数据:用DStream和RDD算子转换,保证 exactly-once;
    • 存储数据: 将结果保存到外部系统,保证 exactly-once。
  • scalikejdbc依赖:
<dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc_${scala.version}</artifactId><version>${scalikejdbc.version}</version>
</dependency>
<dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc-config_${scala.version}</artifactId><version>${scalikejdbc.version}</version>
</dependency>
  • 示例代码:
/*** SparkStreaming EOS:* Input: kafka* Process: SparkStreaming* Output: MySQL* 保证EOS:* 1、偏移量自己管理,即enable.auto.commit=false,这里保存在Mysql中* 2、使用createDirectStream* 3、事务输出: 结果存储与Offset提交在Driver端同一Mysql事务中*/
class Case16_EOSKafkaMysqlAtomic {@transient lazy val log = LoggerFactory.getLogger(this.getClass)private val kafkaCluster = "node01:9092,node02:9092,node03:9092"private val groupId = "consumer-eos"private val topic = "topic_eos"private val mysqlUrl = "jdbc:mysql://node01:3306/test"private val mysqlUsr = "root"private val mysqlPwd = "123456"def main(args: Array[String]): Unit = {// 准备kafka参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> kafkaCluster,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean),"group.id" -> groupId)// 数据库连接池ConnectionPool.singleton(mysqlUrl, mysqlUsr, mysqlPwd)val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(5))// 1、初次启动或重启时,从指定的Partition、Offset构建TopicPartition// 2、运行过程中,每个Partition、Offset保存在内部currentOffsets = Map[TopicPartition, Long]()变量中// 3、后期Kafka Topic分区扩展,在运行过程中不能自动感知val initOffset = DB.readOnly(implicit session => {sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${groupId}".map(item => new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset")).list().apply().toMap})// CreateDirectStream: 从指定的Topic、Partition、Offset开始消费val sourceDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Assign[String, String](initOffset.keys, kafkaParams, initOffset))sourceDStream.foreachRDD(rdd => {if (!rdd.isEmpty()) {val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesoffsetRanges.foreach(offsetRange => {log.info(s"Topic: ${offsetRange.topic}, Group: ${groupId}, Partition: ${offsetRange.partition}, fromOffset: ${offsetRange.fromOffset}, untilOffset: ${offsetRange.untilOffset}")})// 统计分析val sparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()import sparkSession.implicits._val dataFrame = sparkSession.read.json(rdd.map(_.value()).toDS())dataFrame.createOrReplaceTempView("tmpTable")val result = sparkSession.sql("""| select eventTimeMinute, language, count(1) pv, count(distinct(userID)) uv| from (select *, substr(eventTime,0,16) eventTimeMinute from tmpTable) as tmp| group by eventTimeMinute, language""".stripMargin).collect()// 在Driver端存储数据、提交Offset,结果存储与Offset提交在同一事务中原子执行,这里将偏移量保存在Mysql中DB.localTx(implicit session => {result.foreach(row => {sql"""insert into twitter_pv_uv (eventTimeMinute,language,pv,uv) values (${row.getAs[String]("eventTimeMinute")},${row.getAs[String]("language")},${row.getAs[Long]("pv")},${row.getAs[Long]("uv")},) on duplicate key update pv = pv, uv = uv""".update.apply()})// offset 提交offsetRanges.foreach(offsetRange => {val affectedRows =sql"""update kafka_topic_offset set offset = ${offsetRange.untilOffset}where topic = ${topic} and `group` = ${groupId} and `partition` = ${offsetRange.partition} and offset = ${offsetRange.fromOffset}""".update.apply()if (affectedRows != 1) {throw new Exception(s"""Commit Kafka Topic: ${topic} Offset Faild!""")}})})}})ssc.start()ssc.awaitTermination()}
}

更多推荐

大数据高级开发工程师——Spark学习笔记(10)

本文发布于:2023-06-28 19:50:51,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/932672.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:学习笔记   工程师   高级   数据   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!