spark重点知识

编程入门 行业动态 更新时间:2024-10-03 06:28:00

spark<a href=https://www.elefans.com/category/jswz/34/1769955.html style=重点知识"/>

spark重点知识

RDD,DataFrame,DataSet
  • DataSet
    • 产生于spaek1.6
    • 比DataFrame多了泛型的支持
    • DataFrame就是DataSet[row]
    • DataSet[T]的元素经过tungsten优化,是内存的优化,可以节省空间85%
    • 自动支持各种类型的编码器,就是强类型
      • 基本类型 int ,long ,string
      • row
      • jvm对象,case class 对象
    • 使得元素是样例类对象【student(name:string;age:int】时,将类的属性名映射成sql的表字段名,所以全面支持sql和dsl操作
    • 支持编译时类型安全检查(ds.map(s=>s.不存在的字段) 这句代码编译时不通过
Hadoop,spark的各种端口
  • 当使用standalone模式时,才设计下面4个端口
    • 7077 sparkMaster进程的通信端口
    • 8080 Master 的web ui 的查看端口
    • 7078 spark worker 进程的通信端口
    • 8081 worker 的webui 的查看端口
  • 18080 spark 历史任务的webui查看端口
  • 4040 查看某个蒸菜运行的spark application 的web端口
  • 198888 是hadoop 的 mapreduce类型的任务的jobHistory的web端口
  • 8088 是hadoop Yarn 类型的任务的监控页面
RDD的特点
  • 五大特性

    • 分区列表
    • 依赖关系
    • 计算函数
    • 如果元素是key-value类型是,可以指定分区器
    • 位置优先性
  • 一个job,从action算子的最后一个RDD追溯到初始RDD之间的依赖关系,叫血缘关系,也叫lineaeg

    • 依赖分为,宽依赖和窄依赖
Broadcast 广播变量

  • 在driver中定义一个广播变量
    obj对象需要支持序列化 val obj =new XXX val bc = sc.broadCast(obj)
  • 注意obj需要序列化,因为要从driver端网络传输到各个Executor端
  • 在分布式的Executor的代码汇总获取对象
    rdd.map(x=>{ //次obj是经过网络传输后反序列化后的 val obj =bc.value })
  • spark 默认是将对象广播扫executor的每个task中,随着task数量增加,网络的传输压力也会增加,但是executor的数目比task要少很多,更适合将对象广播到executor中 ,executor 中的所有task都共享这个变量
Accumulator 累加器
  • 多个节点对同一个变量进行累加操作,spark目前只支持累加操作
  • spark有3个原生的累加器 : LongAccumulator ,DoubelAccumulator ,CollectionAccumulator
  • 还可以自定义累加器 class MyAccumulator extends AccumulatorV2{ }
spark 部署模式
  • spark应用Application
    • 运行下面的几个命令都会生成一个应用
      • spark-submit
      • spark-shell
      • spark-sql
      • 在IDEA的main方法中run,底层调用 spark Application
    • 他们都需要指定参数 --master MASTER_RUL(MASTER_URL就是资源管理器的主节点,当前程序委托谁来分配资源)
    • 企业用的最多的是 spark-submit ,因为可以支持更复杂灵活的业务
  • 提交给单机
    • spark/bin/spark-submit --master local[n] --class XX xx.jar 参数1 参数2
    • n表示本机的几核几线程
    • local[*]表示使用本机的所有CPU core
  • 提交给多节点(集群)
    • 下面的几种都有client和cluster(集群)两种模式,区别是Driver进程是运行在提交的机器上(client),还是让集群随机选一台(cluster),前者是client,后者是cluster

    • standalone

      • 使用的是Spark自带的资源管理器
      • 前提是要启动 start-master.sh ,start-slaves.sh后才能使用,jps需要看到一个Master,和多个Woeker进程
      • 提交任务的命令:
        • client : spark/bin/spark-submit --master spark://node1:7077 --deploy-mode client --class XX xx.jar 参数1 参数2
        • spark/bin/spark-submit --master spark://node1:7077 --deploy-mode cluster --class XX xx.jar 参数1 参数2
    • standaloneHA

      • 因为standalone方式的Master进程只有一个,存在单点故障隐患,此处增加多个Master进程形成高可用,借用zookeeper来选举active的master,其他的master都是standby状态
      • 多台master都要启动(比如node2也是master)
      • 提交的命令: spark/bin/spark-submit --master spark://node1:7077,node2:7077 --deploy-mode client --class XX xx.jar 参数1 参数2
    • yarn

      • 使用的是Hadoop自带的yarn资源管理器
      • 前提是运行start-yarn.sh 就启动了Yarn集群,jps需要看到ResourceManager和多个NodeManager。
      • 提交spark程序到yarn上的命令: spark/bin/spark-submit --master yarn --deploy-mode client(或者cluster) --class XX xx.jar 参数1 参数2

    • mesos【了解】

      • spark/bin/spark-submit --master mesos://host:port
    • k8s【了解】

      • spark/bin/spark-submit --master k8s://http://host:port
stage的Task,并行度,和shuffle的并行度
  • 一个Action算子会触发一个job
  • 一个job会拆分成多个stage
  • 一个stage设计多个RDD
  • 一个RDD的一个分区对应一个Task
  • 在同一个适合,一个CPUcore只能运行一个task
  • 所以stage的task数量由partition的数量决定
  • task的数目,一般手动指定CPU core的2-3倍,这样可以充分运用CPU的性能
  • 调用RDD的算子,比如parallelize,reduceByKey,join等时,可以手动设置分区数,则它就是并行度(sc.parllelize(1 to 10,4),这里设置了4个分区)
  • 在RDD中如果不手动指定分区数,则会使用默认并行度,就是spark.default.parallelism值
    • spark.default.parallelism 值在应用提交之时,就已经确定了,就是申请的core数。
    • 比如 val rdd1=sc.parallelize(xxx) ,此时没有手动指定分区数,rdd1的分区数= spark.default.parallelism 值
  • 当RDD有shuffle时,如果不手动指定分区数,并行度取决于父RDD的分区数
    • val rdd2 = rdd1.reduceByKey(+) ,rdd2的分区数=rdd1的分区数,也就是shuffle的并行度
    • val rdd3=rdd1.join(rdd2) , rdd3的分区数=rdd1和rdd2中的最大分区数,也就是shuffle的并行度
  • 当DataFrame 有shuffle时,join或groupby会触发shuffle,结果DataFrame的分区数有spark.sql.shuffle.partition 决定,默认是200 ,最好自定义指定为cpu的2-3倍,或者根据RDD的数据量而定
宽依赖,窄依赖
  • 一个Action算子会触发一个job,一个job就会对应一个DAG有向无环图,如果转换算子中有shuffle算子,则它就是宽依赖算子,在此处划分不同的stage,否则就是窄依赖。

  • 窄依赖

    • 窄依赖算子 map , filter, flatmap,coalesce
    • 如果父rdd的一个分区的数据,只进入到了子rdd的一个分区中 那就是窄依赖,不一定所有的jion都是shuffle,如下图是窄依赖
  • 宽依赖

    • 如果父rdd的一个分区的数据,分发到了子rdd的多个分区中,就是宽依赖,(会消耗网络宽带)
    • 宽依赖算子 reduceByKey , groupByKey , ByKey的算子都是,某一些join,repartition
cache,persist,checkpoint,storageLevel以及spark容错机制
  • cache
    • 将rdd的数据缓存到内存
    • 是persist的一种特殊情况
    • cache()=persist(StorageLevel.MEMORY_ONLY)
    • rdd,DataFrame,Dataset,都可以调用cache
  • persist
    • storageLevel有五个私有属性
      • 是否备份多分
      • 支持多种缓存级别
        • 带 _ONLY表示仅仅用哪种方式
        • 带_2表示有备份2次
        • 带_SER表示对元素序列化
  • checkpoint 保存在hdfs
    • 检查点,由于cache,persist将数据缓存在内存和磁盘中,依然可能丢失,但是HDFS具有高可用的特性,所有用来保存RDD的数据,可信度最高
    • 会截断血缘关系,因为cache和persist会由于断电或内存OOM崩溃,硬盘损坏,需要重新依据血缘关系恢复某个分区的数据,而HDFS天然自带3份冗余,即使丢失一份,也还有另外两份,没有必要保存血缘关系
  • spark的容错机制:如果RDD的数据部分丢失或顺坏,那么先去cache的内存中查找,如果没有则去persist磁盘中恢复,如果依然没有,则去checkpoint的HDFS中回复数去,如果依然没找,就依靠血缘关机重新计算RDD
区分集中集群概念

spark-submit用standalone模式提交任务后的执行流程

spark-submit 用 yarn-cluster模式执行流程

spark与mapreduce对比,为何spark币mapReduce快
  • spark是基于内存的计算
  • mapreduce的多个stage之间的中间结果,都会落地磁盘,大量的磁盘读写,造成速度慢
  • spark的上一个stage的reduceTask的结果以内存形式,直接提供给stage的maptask使用,所以速度快
  • spark的血缘关系,可以方便的恢复丢失的某个分区数据,但是mapreduce只能从头开始计算所有数据。
  • spark还提供了cache,persist,checkpoint这些持久化机制,将复杂的计算结果缓存起来,并进行复用,大大减少了时间,而reduce没有
  • mapreduce的计算单元是进程,而进程的启动和销毁耗时久,spark的计算单元是线程,线程的创建和销毁更轻量级
  • spark对job的DAG划分,一个stage中可以包含多个转换,比如map,filter,可以将多个窄依赖合并成一个pipeline,合并计算
sparkcore , sparksql, sparkstreaming的数据结构对比
  • sparkCore: RDD
  • sparkSQL:DataFrame,DataSet(spark1.6,将二者统一成Dataset)
  • sparkStreaming:Dstream
  • StructuredStreaming:DataFrame,DataSet
RDD的两种操作 TransFormation与Action算子
  • TransFormation算子 少个图

  • action算子

repartition与coalesce的区别
  • RDD/DataFrame/Dataset 都可以调用repartition和coalesce
  • repartition 一般用来增大分区,伴随着shuffle
    • 调用rdd1.repartition(n) 等价于调用rdd1.coalesce(n,shuffle=true)
    • 何时增大分区,比如加载一个大文件,val rdd=sc.textFile(大文件),此时可以对rdd增大分区,增加并行度
    • repartition可以增大也可以减小分区,都会产生shuffle
  • coalesce,一般用来减小分区,默认没有shuffle
    • 调用rdd1.coalesce(n)等价于调用rdd1,colesce(n,shuffle=false)
    • 如果上面的rdd1的分区数<n ,那么上面不起作用
    • 何时减小分区,当RDD经过处理后数据量大大减小,比如reduceByKey,filter,groupByKey等,可以适当减小分区

更多推荐

spark重点知识

本文发布于:2024-02-28 03:18:11,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1767870.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:重点   知识   spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!