Flink_Flink 的分布式缓存

编程入门 行业动态 更新时间:2024-10-23 20:20:30

Flink_Flink 的<a href=https://www.elefans.com/category/jswz/34/1770120.html style=分布式缓存"/>

Flink_Flink 的分布式缓存

文章目录

  • Flink 的分布式缓存
  • Flink Accumulators & Counters(了解)

Flink 的分布式缓存


操作步骤

  1. 将 distribute_cache_student 文件上传到 HDFS / 目录下
  2. 获取批处理运行环境
  3. 创建成绩数据集
  4. 对 成绩 数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)
    a. RichMapFunction 的 open 方法中,获取分布式缓存数据
    b. 在 map 方法中进行转换
  5. 实现 open 方法
    a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
    b. 使用 Scala.fromFile 读取文件,并获取行
    c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List
  6. 实现 map 方法
    a. 从分布式缓存中根据学生 ID 过滤出来学生
    b. 获取学生姓名
    c. 构建最终结果元组
  7. 打印测试

代码实现

package com.czxy.flink.batchimport java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source/*** 需求:* 创建一个 成绩 数据集* List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))* 请通过分布式缓存获取到学生姓名, 将数据转换为* List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))* 注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名*/
object BatchDisCachedFile {def main(args: Array[String]): Unit = {/*** 实现步骤:* 1) 将 distribute_cache_student 文件上传到 HDFS / 目录下* 2) 获取批处理运行环境* 3) 创建成绩数据集* 4) 对 成绩 数据集进行 map 转换, 将( 学生 ID, 学科, 分数) 转换为( 学生姓名, 学科,* 分数)*  a. RichMapFunction 的 open 方法中, 获取分布式缓存数据*  b. 在 map 方法中进行转换* 5) 实现 open 方法*  a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件*  b. 使用 Scala.fromFile 读取文件, 并获取行*  c. 将文本转换为元组( 学生 ID, 学生姓名) , 再转换为 List* 6) 实现 map 方法*  a. 从分布式缓存中根据学生 ID 过滤出来学生b. 获取学生姓名*  c. 构建最终结果元组* 7) 打印测试*/// 获取批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//注册分布式缓存文件env.registerCachedFile("hdfs://node01:8020/test/input/distribute_cache_student", "student")//创建成绩数据集import org.apache.flink.api.scala._val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var stuMap: Map[Int, String] = null//初始化的时候只被执行一次override def open(parameters: Configuration): Unit = {//获取分布式缓存的文件val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")val linesIter: Iterator[String] = Source.fromFile(studentFile).getLines()stuMap = linesIter.map(item => {val itemArr: Array[String] = item.split(",")(itemArr(0).toInt, itemArr(1))}).toMap}//每条数据都会执行一次override def map(value: (Int, String, Int)): (String, String, Int) = {val name: String = stuMap.getOrElse(value._1, "")(name, value._2, value._3)}})//输出打印测试resultDataSet.print()}
}

Flink Accumulators & Counters(了解)



代码实现

package com.czxy.flink.batchimport org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 需求:* 给定一个数据源* "a","b","c","d"* 通过累加器打印出多少个元素*/
object BatchCounterDemo {def main(args: Array[String]): Unit = {//1.创建执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val sourceDataSet: DataSet[String] = env.fromElements("a", "b", "c", "d")//3.数据处理val counterDataSet: DataSet[String] = sourceDataSet.map(new RichMapFunction[String, String] {//1) 创建累加器val counter: IntCounter = new IntCounter()override def open(parameters: Configuration): Unit = {//2) 注册累加器getRuntimeContext.addAccumulator("myAccumulator", counter)}//每条数据都会执行一次override def map(value: String): String = {//3) 使用累加器counter.add(1)value}})
//    counterDataSet.print()counterDataSet.writeAsText("day02/data/output/Accumulator").setParallelism(1)val result: JobExecutionResult = env.execute("BatchCounterDemo")val myAccumulatorValue: Int = result.getAccumulatorResult[Int]("myAccumulator")println(myAccumulatorValue)}
}
  • Flink Broadcast 和 Accumulators 的区别:
    Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间 传递变量。
    广播变量可以进行共享,但是不可以进行修改 Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。

更多推荐

Flink_Flink 的分布式缓存

本文发布于:2023-07-27 22:08:21,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1225148.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分布式   缓存   Flink_Flink

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!