学习笔记(4)"/>
大数据高级开发工程师——Spark学习笔记(4)
文章目录
- Spark内存计算框架
- Spark Core
- Spark的shuffle过程
- 1. HashShuffleManager
- 未经优化的HashShuffleManager
- 经过优化的HashShuffleManager
- HashShuffleManager 源码解析
- 2. SortShuffleManager
- 普通运行机制
- byPass运行机制
- SortShuffleManager源码解析
- 3. Spark的Shuffle常用参数调优
- spark.shuffle.file.buffer
- spark.reducer.maxSizeInFlight
- spark.shuffle.io.maxRetries
- spark.shuffle.io.retryWait
- spark.shuffle.memoryFraction
- spark.shuffle.sort.bypassMergeThreshold
- 4. Shuffle所有参数介绍
- 回顾Spark的任务调度
- Spark的内存管理
- 1. 堆内内存(On-heap Memory)
- 2. 堆外内存(Off-heap Memory)
- 3. Execution内存和Storage内存动态调整
- 4. Task之间内存分布
- 5. 内存调优
- Spark自定义算子以及自定义RDD
Spark内存计算框架
Spark Core
Spark的shuffle过程
- 任何一个分布式的计算系统,shuffle 都是最为致命的性能瓶颈,因为 shuffle 会产生数据的移动和网络拷贝,走网络拷贝就不是程序能决定的了,涉及到网络硬件的数据传输,所以任何时候,shuffle 都会产生性能的问题,spark 的 shuffle 经过多年发展已经逐渐趋于成熟,主要有早期的 HashShuffleManager 以及现在默认使用的 SortShuffleManager。
- Spark 的 shuffle 演进历史:
- Spark 0.8及以前 Hash Based Shuffle
- Spark 0.8.1 为 Hash Based Shuffle 引入 File Consolidation 机制
- Spark 0.9 引入 ExternalAppendOnlyMap
- Spark 1.1 引入 Sort Based Shuffle,但默认仍为 Hash Based Shuffle
- Spark 1.2 默认的 Shuffle 方式改为 Sort Based Shuffle
- Spark 1.4 引入Tungsten-Sort Based Shuffle
- Spark 1.6 Tungsten-sort并入Sort Based Shuffle
- Spark 2.0 Hash Based Shuffle退出历史舞台
- Spark 的所有配置项:.3.3/configuration.html
- spark-shuffle 参数配置官方说明:.3.3/configuration.html#shuffle-behavior
1. HashShuffleManager
未经优化的HashShuffleManager
- 在 Spark 早期版本(1.2.0)之前都是使用的 HashShuffleManager,其会产生大量的小文件,具体实现过程如下图所示:
- 在 mapTask 过程按照 Hash 的方式重组 partition 的数据,不进行排序。每个 mapTask 为每个 reduceTask 生成一个文件,通常会产生大量的文件(即对应为 M*R 个中间文件,其中 M 表示 mapTask 个数,R 表示 reduceTask 个数),伴随大量的随机磁盘 I/O 操作与大量的内存开销。
- HashShuffleManager缺陷:
- mapTask 非常容易造成 OOM:如果产生大量的 MapTask-Buffer 很容易将缓冲区直接撑爆;
- reduceTask 非常容易造成 OOM:如果 ReduceTask-Buffer 大量获取小文件很容易将缓冲区直接撑爆;
- reduceTask 去拉取 mapTask 输出数据,大量小文件容易造成网络波动,产生大量小IO,增加机器负荷,容易引起网络失败而导致拉取失败。
经过优化的HashShuffleManager
- 原始的 HashShuffleManager 会产生大量的小文件,造成网络以及磁盘的大量浪费,所以为了解决大量小文件的问题,后来引起一种改进的 HashShuffleManager。
- 针对上面的小文件过多的问题,引入了 File Consolidation 机制。
- 一个 Executor 上所有的 mapTask 针对同一个分区(同一个 reduceTask)只生成一个文件,即将所有的 mapTask 相同的分区文件合并,这样每个 Executor 上最多只生成 N 个分区文件。
- 尽管进过优化之后的 HashShuffleManager 有一定程度的小文件数量的减少,但是还是会产生很多小文件的问题。
- 这样就减少了文件数,但是假如下游 Stage 的分区数 N 很大,还是会在每个 Executor 上生成 N 个文件。
- 同样,如果一个 Executor 上有 K 个 Core,还是会开 K*N 个 Writer Handler,所以这里仍然容易导致OOM。
HashShuffleManager 源码解析
- 导入 spark 1.2 版本的 spark-core 的 jar 包,然后就可以查看早期spark版本当中关于HashShuffleManager的源码
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>1.2.0</version>
</dependency>
- 第一步:初始化 shuffle 管理器,也就是 ShuffleBlockManager,默认使用的是 FileShuffleBlockManager 这个实现类;
- 第二步:注册 shuffle 管理器,通过 RDD 之间的依赖关系,进行注册 shuffle 管理器;
- 第三步:溢写数据,通过 getWriter 方法,获取 HashShuffleWriter 这个对象,通过这个对象调用 write 方法来进行数据的写出;
- 第四步:reduce 端接收数据,通过 getReader 方法,获取 HashShuffleReader 这个对象,通过这个对象调用 read 方法来进行数据的读取。
2. SortShuffleManager
- 为了更好地解决 HashShuffleManager 的问题,Spark 参考了 MapReduce 中 Shuffle 的处理方式,引入基于排序的 Shuffle 写操作机制。
- 总体上看来 Sort Shuffle 解决了 Hash Shuffle 的所有弊端,但是因为需要其 Shuffle 过程需要对记录进行排序,所以在性能上有所损失。
- SortShuffleManager 的运行机制主要分成两种:
- ① 普通运行机制:默认
- ② bypass 运行机制:当 shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值(默认为200)时,就会启用bypass机制。
普通运行机制
- 在普通模式下,每个 task 当中处理的数据,会先写入一个内存数据结构当中
- 内存数据结构是 Map 或者 Array,根据不同的 shuffle 算子,选用不同的数据结构;
- 如果是 reduceByKey 这类聚合 shuffle 算子,那么就会选用 Map 数据结构;
- 如果是 join 这种普通 shuffle 算子,那么就会选用 Array 数据结构;
- 每次写入一条数据,判断内存阈值,达到阈值,溢写到磁盘,清空内存结构数据。
- 溢写之前,会根据 key 对内存数据结构进行排序,排序之后分批次写入磁盘,每批次默认写入 10000 条,使用 java 的 BufferedOutputStream 来实现的,可以减少磁盘 IO 次数,提升性能。
- task 多次溢写,形成多个小文件,小文件最终进行合并,就是 merge 过程
- 此时将之前所有的溢写文件全部读取出来,然后依次写入最终的磁盘文件当中形成一个大文件;
- 为了解决大文件分配到各个下游 task 的数据标识问题,还会写入一份索引文件,索引文件标识了下游每个 task 当汇总所属数据的 start offset 以及 end offset。
- SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。
byPass运行机制
- byPass机制的触发条件:
- shuffle reduce task 的数量小于 spark.shuffle.sort.bypassMergeThreshold 参数的值(默认200)
- 不是预聚合类的 shuffle 算子(也就是没有 map-side aggregation 的 shuffle 算子(例如groupByKey或者groupBy等))
- 此时 task 会为每个下游 task 都创建一个临时磁盘文件,对数据按照 key 进行 hash 取值,然后将对应的数据写入到对应的磁盘文件。最终进行磁盘文件的合并,并创建索引文件确定最后大的磁盘文件里面的数据属于哪一个下游的 reducetask。
- 该机制与普通的 SortShuffleManager 不同在于:
- 磁盘写入机制不同
- 不会对数据进行排序
SortShuffleManager源码解析
- 第一步:初始化 shuffle 管理器,也就是 IndexShuffleBlockResolver;
- 第二步:注册 shuffle 管理器,通过 RDD 之前的依赖关系,来进行注册 shuffle 管理器,这里使用到的 shuffle 管理器主要有三个。分别是 BypassMergeSortShuffleHandle、SerializedShuffleHandle 和 BaseShuffleHandle。
- ① BypassMergeSortShuffleHandle:如果 reduceTask 数量小于 200,且没有 map 端的聚合,那么就会是用 bypass 这种机制;
- ② SerializedShuffleHandle:如果 reduceTask 数量大于 200,或者 map 端使用的 shuffle 是需要进行聚合的,那么就使用普通的序列化这种机制;
- ③ BaseShuffleHandle:如果以上两种都不满足,那么就使用最基础的这种,需要对数据进行序列化。
- 第三步:溢写数据,通过 getWriter 方法,获取 HashShuffleWriter 这个对象,通过这个对象调用 write 方法来进行数据的写出;
- 第四步:reduce 端接受数据,通过 getReader 方法,获取 HashShuffleReader 这个对象,通过这个对象调用 read 方法来进行数据的读取;
- 第五步:释放 IndexShuffleBlockResolver,shuffle 管理器用完了之后,就进行释放 shuffle 管理器;
- 第六步:停止运行 shuffle,shuffle 整个阶段全部运行完成就结束 shuffle 过程。
3. Spark的Shuffle常用参数调优
- 这里我们主要讨论2.x之后的版 SortShuffleManager 的参数调优:.3.3/configuration.html#shuffle-behavior
spark.shuffle.file.buffer
- 默认值:32K
- 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
- 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight
- 默认值:48m
- 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
- 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。如果shuffle过程产生大量小文件的话,建议将这个值调小,多拉取几次数据,例如可以调整成为24m,避免内存太大浪费内存。
spark.shuffle.io.maxRetries
- 默认值:3
- 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
- 调优建议:对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM 的 FULL GC 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。
spark.shuffle.io.retryWait
- 默认值:5s
- 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
- 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
spark.shuffle.memoryFraction
- 默认值:0.2
- 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
- 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。
spark.shuffle.sort.bypassMergeThreshold
- 默认值:200
- 参数说明:当 ShuffleManager 为 SortShuffleManager 时,如果 shuffle read task 的数量小于这个阈值(默认是200),则 shuffle write 过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager 的方式去写数据,但是最后会将每个 task 产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
- 调优建议:当你使用 SortShuffleManager 时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用 bypass 机制,map-side 就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
4. Shuffle所有参数介绍
Property Name | Default | Meaning |
---|---|---|
spark.reducer.maxSizeInFlight | 48m | reduce内存缓冲区获取数据的大小 |
spark.reducer.maxReqsInFlight | Int.MaxValue | 拉取block数据的请求数量的限制,如果发生数据倾斜,会造成某个节点的拉取数据的请求量非常大,那么可以通过限制这个参数,防止节点请求数量太大造成宕机 |
spark.reducer.maxBlocksInFlightPerAddress | Int.MaxValue | 每台机器上允许拉取的最大的block块的数量 |
spark.maxRemoteBlockSizeFetchToMem | Long.MaxValue | 每台机器上最大的block块拉取到内存的数据量;如果超过此值,则写入磁盘 |
spark.shuffle.compress | true | 启用shuffle数据压缩 |
spark.shuffle.file.buffer | 32k | map端buffer缓冲区大小默认32K |
spark.shuffle.io.maxRetries | 3 | netty模式下专用,shuffle拉取数据出现错误重试的次数 |
spark.shuffle.io.numConnectionsPerPeer | 1 | netty模式下不同节点之间的连接重用次数,稍微调大这个值,可以提高效率 |
spark.shuffle.io.preferDirectBufs | true | netty模式下,启用堆外内存,以便于减少JVM的GC的次数 |
spark.shuffle.io.retryWait | 5s | netty模式下的重试时间间隔 |
spark.shuffle.service.enabled | false | 是否启用其他的shuffle机制,例如自定义的shuffle机制等,如果启用,那么必须开启spark.dynamicAllocation.enabled 为 true。也就是启用动态资源划分 |
spark.shuffle.service.port | 7337 | spark shuffle的端口 |
spark.shuffle.service.index.cache.size | 100m | index文件最大的占用内存,如果文件非常大,这个值也需要调高 |
spark.shuffle.maxChunksBeingTransferred | Long.MAX_VALUE | shuffle服务允许同时传输的chunk数据块的最大数量 |
spark.shuffle.sort.bypassMergeThreshold | 200 | byPass机制运行的条件之一,使用Sort-based shuffle manager 避免对数据进行排序 |
spark.shuffle.spill.compress | true | 溢写的数据是否进行压缩 |
spark.shuffle.accurateBlockThreshold | 100 * 1024 * 1024 | 一个阈值条件,高于该阈值,HighlyCompressedMapStatus当中的reduce的block块将被准确记录,避免在shuffle的过程中错误记录block块的大小,有助于防止程序的OOM异常 |
spark.shuffle.registration.timeout | 5000 | 注册第三方的shuffle方式的超时时间 |
spark.shuffle.registration.maxAttempts | 3 | 注册第三方的shuffle方式超时时候的重试次数 |
spark.io.encryption.enabled | false | 是否对IO流数据进行加密 |
spark.io.encryption.keySizeBits | 128 | 如果对IO流数据进行加密,加密流的key的大小,可以选择128,192和256 |
spark.io.encryption.keygen.algorithm | HmacSHA1 | IO流加密方式 |
回顾Spark的任务调度
- Driver 端运行客户端的 main 方法,构建 SparkContext 对象,在 SparkContext 对象内部依次构建 DAGScheduler 和 TaskScheduler。
- 依照 RDD 的一系列操作顺序,来生成 DAG 有向无环图。
- DAGScheduler 拿到 DAG 有向无环图之后,按照宽依赖进行 stage 的划分。每一个 stage 内部有很多并行运行的 task,最后封装在一个一个的 taskSet 集合中,然后 taskSet 发送给 TaskScheduler。
- TaskScheduler 得到 taskSet 集合后,依次遍历取出每一个 task 提交到 worker 节点上的 Executor 进程中运行。
- 所有 task 运行完成,整个任务也就结束了。
Spark的内存管理
- 在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以在应用程序启动前进行配置,静态内存管理方式早就已经被淘汰了,现在 spark 新的版本都是直接使用动态内存管理,这里我们也只讨论动态内存管理模型。
- 作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的==堆内(On-heap)==空间进行了更为详细的分配,以充分利用内存。
- 同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
- 堆内内存受到 JVM 统一管理(存在GC),堆外内存式直接向操作系统进行内存的申请和释放(不存在GC)。
- 官网关于内存管理配置说明
- 动态内存管理也叫统一内存管理,动态内存:即执行内存和存储内存之间可以互相抢占。统一内存管理模块包括了两大区域:
- ① 堆内内存(On-heap Memory)
- ② 堆外内存(Off-heap Memory)
1. 堆内内存(On-heap Memory)
- 默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:
- Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据。
- Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据。
- 用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。
- 预留内存(Reserved Memory):系统预留内存,会用来存储 Spark 内部对象。
- 整个 Executor 端堆内内存如果用图来表示的话,可以概括如下:
systemMemory = Runtime.getRuntime.maxMemory
,其实就是通过参数spark.executor.memory
或--executor-memory
配置的。- reservedMemory 在 Spark 2.2.1 中是写死的,其值等于 300MB,这个值是不能修改的(如果在测试环境下,我们可以通过
spark.testing.reservedMemory
参数进行修改); usableMemory = systemMemory - reservedMemory
,这个就是 Spark 可用内存;- 堆内内存的大小,由 Spark 应用程序启动时的 executor-memory 或 spark.executor.memory参数配置,Executor 内运行的并发任务共享 JVM 堆内内存
- 这些任务在缓存 RDD 数据和广播(Broadcast)数据时,占用的内存被规划为储存(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划委执行(Executor)内存
- 剩余的部分不做特殊规划
- 那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间,不同的管理模式下,这三部分占用的空间大小各不相同
- Spark 对堆内内存的管理是一种逻辑上的“规划式”管理,因为对象实际占用内存的申请和释放都是由 JVM 完成,Spark 只能在申请后和释放前记录这些内存
- 申请内存流程如下:
- Spark 记录该对象释放的内存,删除该对象的引用;
- 等待 JVM 的垃圾回收机制释放该对象占用的堆内内存。
2. 堆外内存(Off-heap Memory)
- Spark 1.6 开始引入了 Off-heap memory 详见SPARK-11389。这种模式不在 JVM 内申请内存,而是调用 Java 的 unsafe(危险) 相关 API 进行诸如 C 语言里面的 malloc() 直接向操作系统申请内存(malloc=memory allocation=动态内存分配)。
- 由于这种方式不进过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑。
- 默认情况下,堆外内存是关闭的,我们可以通过
spark.memory.offHeap.enabled
参数启用,并且通过spark.memory.offHeap.size
设置堆外内存大小,单位为字节。 - 如果堆外内存被启用,那么 Executor 内将同时存在堆内和堆外内存,两者的使用互不影响。
- 这个时候 Executor 中的 Execution 内存是堆内的 Execution 内存和堆外的 Execution 内存之和,同理,Storage 内存也一样。
- 相比堆内内存,堆外内存只区分 Execution 内存和 Storage 内存,其内存分布如下图所示:图中的 maxOffHeapMemory 等于 spark.memory.offHeap.size 参数配置的。
3. Execution内存和Storage内存动态调整
- 在 Spark 1.5 之前,Execution 内存和 Storage 内存分配是静态的,换句话说就是如果 Execution 内存不足,即使 Storage 内存有很大空闲程序也是无法利用到的;反之亦然。
- 这就导致我们很难进行内存的调优工作,我们必须非常清楚地了解 Execution 和 Storage 两块区域的内存分布。
- 而目前 Execution 内存和 Storage 内存可以互相共享的。也就是说,如果 Execution 内存不足,而 Storage 内存有空闲,那么 Execution 可以从 Storage 中申请空间;反之亦然。所以图中的虚线代表 Execution 内存和 Storage 内存是可以随着运作动态调整的,这样可以有效地利用内存资源。Execution 内存和 Storage 内存之间的动态调整可以概括如下:
- 具体的实现逻辑如下:
- 程序提交时,都会设定基本的 Execution 内存和 Storage 内存(通过 spark.memory.storageFraction 参数设置);
- 在程序运行时,如果双方空间都不足时,则存储到硬盘;将内存中的块,存储到磁盘的策略是按照 LRU 规则进行的;
- 若一方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
- Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后“归还”借用的空间;
- Storage 内存的空间被对方占用后,目前的实现是无法让对方“归还”,因为考虑 Shuffle 过程中的很多因素,实现起来较为复杂;而且 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用。
- 注意,上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。
4. Task之间内存分布
- 为了更好地使用内存,Executor 内运行的 Task 之间共享着 Execution 内存。具体的,Spark 内部维护了一个 HashMap 用于记录每个 Task 占用的内存。
- 当 Task 需要在 Execution 内存区域申请 numBytes 内存时:
- 其首先,判断 HashMap 里面是否维护着这个 Task 内存使用情况,如果没有,则将这个 Task 内存使用置为 0,并且以 TaskId 为 key,内存使用值为 value 加入到 HashMap 里面;
- 之后为这个 Task 申请 numBytes 内存,如果 Execution 内存区域正好大于 numBytes 的空闲内存,则在 HashMap 里面将当前 Task 使用的内存加上 numBytes,然后返回;
- 如果当前 Execution 内存区域无法申请到每个 Task 最小可申请的内存,则当前 Task 被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。
- 每个 Task 可以使用 Execution 内存大小范围 1/2N ~ 1N,其中 N 为当前 Executor 内正在运行的 Task 个数。
- 一个 Task 能够运行必须申请到最小内存为(1/2N * Execution 内存);
- 当 N = 1 时,Task 可以使用全部的 Execution 内存。
- 比如,如果 Execution 内存大小为 10GB,当前 Executor 内正在运行的 Task 个数为 5,则该 Task 可以申请的内存范围为 10/(2*5) ~ 10/5,也就是 1GB ~ 2GB。
5. 内存调优
- Spark 性能调优的第一步,就是为任务分配更多的资源;在一定范围内,增加资源的分配与性能的提升是成正比的;实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
- 资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如下:
spark-submit \
--class com.xxx.spark.TestSpark \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar
名称 | 说明 |
---|---|
–num-executors | 配置Executor的数量 |
–driver-memory | 配置Driver内存(影响不大) |
–executor-memory | 配置每个Executor的内存大小 |
–executor-cores | 配置每个Executor的CPU core数量 |
- 调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。
- 对于具体资源的分配,我们分别讨论 Spark 的两种 Cluster 运行模式:
- 第一种是 Spark Standalone 模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源,在编写 submit 脚本时,就根据可用的资源情况进行资源的分配,比如说集群有15台机器,每台机器为8G内存,2个CPU core,那么就指定15个Executor,每个Executor分配8G内存,2个CPU core。
- 第二种是 Spark Yarn 模式,由于 Yarn 使用资源队列进行资源的分配和调度,在表写 submit 脚本时,就根据 Spark 作业要提交到的资源队列,进行资源的分配,比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。
- 各项资源进行调节后,得到的性能提升如下表:
名称 | 解析 |
---|---|
增加Executor个数 | 在资源允许的情况下,增加Executor的个数可以提高执行task的并行度。 比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task, 如果将Executor的个数增加到8个(资源允许的情况下),那么可以并行执行16个task,此时的并行能力提升了一倍。 |
增加每个Executor的CPU core个数 | 在资源允许的情况下,增加每个Executor的Cpu core个数,可以提高执行task的并行度。 比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task, 如果将每个Executor的CPU core个数增加到4个(资源允许的情况下), 那么可以并行执行16个task,此时的并行能力提升了一倍 |
增加每个Executor的内存量 | 在资源允许的情况下,增加每个Executor的内存量以后,对性能的提升有三点: ①可以缓存更多的数据(即对RDD进行cache),写入磁盘的数据相应减少, 甚至可以不写入磁盘,减少了可能的磁盘IO; ②可以为shuffle操作提供更多内存,即有更多空间来存放reduce端拉取的数据, 写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO; ③ 可以为task的执行提供更多内存,在task的执行过程中可能创建很多对象, 内存较小时会引发频繁的GC,增加内存后,可以避免频繁的GC,提升整体性能。 |
- 生产环境Spark submit脚本配置:
spark-submit \
--class com.xxx.spark.WorkCount \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar
- 参数配置参考值:
- –num-executors:50~100
- –driver-memory:1G~5G
- –executor-memory:6G~10G
- –executor-cores:3
- –master:实际生产环境一定使用yarn-cluster
Spark自定义算子以及自定义RDD
- RDD 已经给我们提供了很多的各种类型的算子,例如 transformation 类型的算子或者 action 类型的算子。
- 如果这些算子还不够我们使用的话,我们还可以自定义各种算子(其实就是定义方法)给RDD进行扩种,例如我们可以自定义 transformation 或者自定义 action 的方法,给 RDD 进一步的扩充算子。
- 需求:
- 读取文件内容,将数据进行切分转换成为对象,然后通过自定义算子,将数据中的金额全部抽取出来成为一个RDD[Double];
- 然后通过自定义算子对当中的数据进行计算,计算金额总和;
- 最后自定义RDD,通过对金额进行打折之后返回一个新的RDD。
- 数据文件 sales.txt 内容如下:
1,userid1,itemid1,128.00
2,userid2,itemid2,135.00
3,userid3,itemid3,147.00
4,userid4,itemid4,196.00
5,userid5,itemid5,178.00
6,userid6,itemid6,108.00
- 代码实现
/*** 定义样例类:封装数据*/
case class SalesRecord(val transactionId: String,val customerId: String,val itemId: String,val itemValue: Double) extends Serializable/*** 定义增强函数*/
class CustomFunctions(rdd: RDD[SalesRecord]) {def changeDatas: RDD[Double] = rdd.map(x => x.itemValue)def getTotalValue: Double = rdd.map(x => x.itemValue).sum()def discount(discountPercentage: Double) = new CustomRDD(rdd, discountPercentage)
}
object CustomFunctions {implicit def addIteblogCustomFunctions(rdd: RDD[SalesRecord]) = new CustomFunctions(rdd)
}
/*** 自定义定义RDD*/
class CustomRDD(prev: RDD[SalesRecord], discountPercentage: Double) extends RDD[SalesRecord](prev) {override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] = {firstParent[SalesRecord].iterator(split, context).map(salesRecord => {val discount = salesRecord.itemValue * discountPercentage// 样例类,不需要newSalesRecord(salesRecord.transactionId, salesRecord.customerId, salesRecord.itemId, discount)})}override protected def getPartitions: Array[Partition] = {firstParent[SalesRecord].partitions}
}object Case08_CustomRDD {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val sc = new SparkContext(conf)val dataRDD = sc.textFile(this.getClass.getClassLoader.getResource("sales.txt").getPath)val salesRecordRDD: RDD[SalesRecord] = dataRDD.map(row => {val colValues = row.split(",")SalesRecord(colValues(0), colValues(1), colValues(2), colValues(3).toDouble)})import com.yw.spark.example.cases.CustomFunctions._// 总金额println("Spark RDD API: " + salesRecordRDD.map(_.itemValue).sum)// output: Spark RDD API: 892.0// 通过隐式转换的方法,增加rdd的transformation算子// 需求一:获得item金额val moneyRDD: RDD[Double] = salesRecordRDD.changeDatasprintln("customer RDD API: " + moneyRDD.collect().toBuffer)// output: customer RDD API: ArrayBuffer(128.0, 135.0, 147.0, 196.0, 178.0, 108.0)// 需求二:给rdd增加action算子,获得总金额val totalResult: Double = salesRecordRDD.getTotalValueprintln("total_result: " + totalResult)// output: total_result: 892.0// 需求三:自定义RDD,将RDD转换成为新的RDDval resultCountRDD: CustomRDD = salesRecordRDD.discount(0.8)println(resultCountRDD.collect().toBuffer)// output: ArrayBuffer(SalesRecord(1,userid1,itemid1,102.4), SalesRecord(2,userid2,itemid2,108.0), SalesRecord(3,userid3,itemid3,117.60000000000001), SalesRecord(4,userid4,itemid4,156.8), SalesRecord(5,userid5,itemid5,142.4), SalesRecord(6,userid6,itemid6,86.4))sc.stop()}
}
更多推荐
大数据高级开发工程师——Spark学习笔记(4)
发布评论