重点知识"/>
spark重点知识
RDD,DataFrame,DataSet
- DataSet
- 产生于spaek1.6
- 比DataFrame多了泛型的支持
- DataFrame就是DataSet[row]
- DataSet[T]的元素经过tungsten优化,是内存的优化,可以节省空间85%
- 自动支持各种类型的编码器,就是强类型
- 基本类型 int ,long ,string
- row
- jvm对象,case class 对象
- 使得元素是样例类对象【student(name:string;age:int】时,将类的属性名映射成sql的表字段名,所以全面支持sql和dsl操作
- 支持编译时类型安全检查(ds.map(s=>s.不存在的字段) 这句代码编译时不通过
Hadoop,spark的各种端口
- 当使用standalone模式时,才设计下面4个端口
- 7077 sparkMaster进程的通信端口
- 8080 Master 的web ui 的查看端口
- 7078 spark worker 进程的通信端口
- 8081 worker 的webui 的查看端口
- 18080 spark 历史任务的webui查看端口
- 4040 查看某个蒸菜运行的spark application 的web端口
- 198888 是hadoop 的 mapreduce类型的任务的jobHistory的web端口
- 8088 是hadoop Yarn 类型的任务的监控页面
RDD的特点
-
五大特性
- 分区列表
- 依赖关系
- 计算函数
- 如果元素是key-value类型是,可以指定分区器
- 位置优先性
-
一个job,从action算子的最后一个RDD追溯到初始RDD之间的依赖关系,叫血缘关系,也叫lineaeg
- 依赖分为,宽依赖和窄依赖
Broadcast 广播变量
- 在driver中定义一个广播变量
obj对象需要支持序列化 val obj =new XXX val bc = sc.broadCast(obj)
- 注意obj需要序列化,因为要从driver端网络传输到各个Executor端
- 在分布式的Executor的代码汇总获取对象
rdd.map(x=>{ //次obj是经过网络传输后反序列化后的 val obj =bc.value })
- spark 默认是将对象广播扫executor的每个task中,随着task数量增加,网络的传输压力也会增加,但是executor的数目比task要少很多,更适合将对象广播到executor中 ,executor 中的所有task都共享这个变量
Accumulator 累加器
- 多个节点对同一个变量进行累加操作,spark目前只支持累加操作
- spark有3个原生的累加器 : LongAccumulator ,DoubelAccumulator ,CollectionAccumulator
- 还可以自定义累加器 class MyAccumulator extends AccumulatorV2{ }
spark 部署模式
- spark应用Application
- 运行下面的几个命令都会生成一个应用
- spark-submit
- spark-shell
- spark-sql
- 在IDEA的main方法中run,底层调用 spark Application
- 他们都需要指定参数 --master MASTER_RUL(MASTER_URL就是资源管理器的主节点,当前程序委托谁来分配资源)
- 企业用的最多的是 spark-submit ,因为可以支持更复杂灵活的业务
- 运行下面的几个命令都会生成一个应用
- 提交给单机
- spark/bin/spark-submit --master local[n] --class XX xx.jar 参数1 参数2
- n表示本机的几核几线程
- local[*]表示使用本机的所有CPU core
- 提交给多节点(集群)
-
下面的几种都有client和cluster(集群)两种模式,区别是Driver进程是运行在提交的机器上(client),还是让集群随机选一台(cluster),前者是client,后者是cluster
-
standalone
- 使用的是Spark自带的资源管理器
- 前提是要启动 start-master.sh ,start-slaves.sh后才能使用,jps需要看到一个Master,和多个Woeker进程
- 提交任务的命令:
- client : spark/bin/spark-submit --master spark://node1:7077 --deploy-mode client --class XX xx.jar 参数1 参数2
- spark/bin/spark-submit --master spark://node1:7077 --deploy-mode cluster --class XX xx.jar 参数1 参数2
- client : spark/bin/spark-submit --master spark://node1:7077 --deploy-mode client --class XX xx.jar 参数1 参数2
-
standaloneHA
- 因为standalone方式的Master进程只有一个,存在单点故障隐患,此处增加多个Master进程形成高可用,借用zookeeper来选举active的master,其他的master都是standby状态
- 多台master都要启动(比如node2也是master)
- 提交的命令: spark/bin/spark-submit --master spark://node1:7077,node2:7077 --deploy-mode client --class XX xx.jar 参数1 参数2
-
yarn
- 使用的是Hadoop自带的yarn资源管理器
- 前提是运行start-yarn.sh 就启动了Yarn集群,jps需要看到ResourceManager和多个NodeManager。
- 提交spark程序到yarn上的命令: spark/bin/spark-submit --master yarn --deploy-mode client(或者cluster) --class XX xx.jar 参数1 参数2
-
mesos【了解】
- spark/bin/spark-submit --master mesos://host:port
-
k8s【了解】
- spark/bin/spark-submit --master k8s://http://host:port
-
stage的Task,并行度,和shuffle的并行度
- 一个Action算子会触发一个job
- 一个job会拆分成多个stage
- 一个stage设计多个RDD
- 一个RDD的一个分区对应一个Task
- 在同一个适合,一个CPUcore只能运行一个task
- 所以stage的task数量由partition的数量决定
- task的数目,一般手动指定CPU core的2-3倍,这样可以充分运用CPU的性能
- 调用RDD的算子,比如parallelize,reduceByKey,join等时,可以手动设置分区数,则它就是并行度(sc.parllelize(1 to 10,4),这里设置了4个分区)
- 在RDD中如果不手动指定分区数,则会使用默认并行度,就是spark.default.parallelism值
- spark.default.parallelism 值在应用提交之时,就已经确定了,就是申请的core数。
- 比如 val rdd1=sc.parallelize(xxx) ,此时没有手动指定分区数,rdd1的分区数= spark.default.parallelism 值
- 当RDD有shuffle时,如果不手动指定分区数,并行度取决于父RDD的分区数
- val rdd2 = rdd1.reduceByKey(+) ,rdd2的分区数=rdd1的分区数,也就是shuffle的并行度
- val rdd3=rdd1.join(rdd2) , rdd3的分区数=rdd1和rdd2中的最大分区数,也就是shuffle的并行度
- 当DataFrame 有shuffle时,join或groupby会触发shuffle,结果DataFrame的分区数有spark.sql.shuffle.partition 决定,默认是200 ,最好自定义指定为cpu的2-3倍,或者根据RDD的数据量而定
宽依赖,窄依赖
-
一个Action算子会触发一个job,一个job就会对应一个DAG有向无环图,如果转换算子中有shuffle算子,则它就是宽依赖算子,在此处划分不同的stage,否则就是窄依赖。
-
窄依赖
- 窄依赖算子 map , filter, flatmap,coalesce
- 如果父rdd的一个分区的数据,只进入到了子rdd的一个分区中 那就是窄依赖,不一定所有的jion都是shuffle,如下图是窄依赖
-
宽依赖
- 如果父rdd的一个分区的数据,分发到了子rdd的多个分区中,就是宽依赖,(会消耗网络宽带)
- 宽依赖算子 reduceByKey , groupByKey , ByKey的算子都是,某一些join,repartition
cache,persist,checkpoint,storageLevel以及spark容错机制
- cache
- 将rdd的数据缓存到内存
- 是persist的一种特殊情况
- cache()=persist(StorageLevel.MEMORY_ONLY)
- rdd,DataFrame,Dataset,都可以调用cache
- persist
- storageLevel有五个私有属性
- 是否备份多分
- 支持多种缓存级别
- 带 _ONLY表示仅仅用哪种方式
- 带_2表示有备份2次
- 带_SER表示对元素序列化
- 是否备份多分
- storageLevel有五个私有属性
- checkpoint 保存在hdfs
- 检查点,由于cache,persist将数据缓存在内存和磁盘中,依然可能丢失,但是HDFS具有高可用的特性,所有用来保存RDD的数据,可信度最高
- 会截断血缘关系,因为cache和persist会由于断电或内存OOM崩溃,硬盘损坏,需要重新依据血缘关系恢复某个分区的数据,而HDFS天然自带3份冗余,即使丢失一份,也还有另外两份,没有必要保存血缘关系
- spark的容错机制:如果RDD的数据部分丢失或顺坏,那么先去cache的内存中查找,如果没有则去persist磁盘中恢复,如果依然没有,则去checkpoint的HDFS中回复数去,如果依然没找,就依靠血缘关机重新计算RDD
区分集中集群概念
spark-submit用standalone模式提交任务后的执行流程
spark-submit 用 yarn-cluster模式执行流程
spark与mapreduce对比,为何spark币mapReduce快
- spark是基于内存的计算
- mapreduce的多个stage之间的中间结果,都会落地磁盘,大量的磁盘读写,造成速度慢
- spark的上一个stage的reduceTask的结果以内存形式,直接提供给stage的maptask使用,所以速度快
- spark的血缘关系,可以方便的恢复丢失的某个分区数据,但是mapreduce只能从头开始计算所有数据。
- spark还提供了cache,persist,checkpoint这些持久化机制,将复杂的计算结果缓存起来,并进行复用,大大减少了时间,而reduce没有
- mapreduce的计算单元是进程,而进程的启动和销毁耗时久,spark的计算单元是线程,线程的创建和销毁更轻量级
- spark对job的DAG划分,一个stage中可以包含多个转换,比如map,filter,可以将多个窄依赖合并成一个pipeline,合并计算
sparkcore , sparksql, sparkstreaming的数据结构对比
- sparkCore: RDD
- sparkSQL:DataFrame,DataSet(spark1.6,将二者统一成Dataset)
- sparkStreaming:Dstream
- StructuredStreaming:DataFrame,DataSet
RDD的两种操作 TransFormation与Action算子
-
TransFormation算子 少个图
-
action算子
repartition与coalesce的区别
- RDD/DataFrame/Dataset 都可以调用repartition和coalesce
- repartition 一般用来增大分区,伴随着shuffle
- 调用rdd1.repartition(n) 等价于调用rdd1.coalesce(n,shuffle=true)
- 何时增大分区,比如加载一个大文件,val rdd=sc.textFile(大文件),此时可以对rdd增大分区,增加并行度
- repartition可以增大也可以减小分区,都会产生shuffle
- coalesce,一般用来减小分区,默认没有shuffle
- 调用rdd1.coalesce(n)等价于调用rdd1,colesce(n,shuffle=false)
- 如果上面的rdd1的分区数<n ,那么上面不起作用
- 何时减小分区,当RDD经过处理后数据量大大减小,比如reduceByKey,filter,groupByKey等,可以适当减小分区
更多推荐
spark重点知识
发布评论