Spark Shuffle参数 及 内存管理

编程入门 行业动态 更新时间:2024-10-27 10:28:46

Spark Shuffle参数 及 <a href=https://www.elefans.com/category/jswz/34/1766335.html style=内存管理"/>

Spark Shuffle参数 及 内存管理

      • Spark Shuffle 参数
      • Spark 的内存管理
      • 静态内存管理
      • 统一内存管理

Spark Shuffle 参数

在工作过程中,我们需要用到Spark Shuffle 相关的参数

参数名称默认值释义
spark.shuffle.consolidateFilestrue合并shuffle期间产生的中间文件
spark.shuffle.file.buffer shuffle32缓冲区的大小建议调大成64 或128
spark.reducer.maxSizeInFlight32从每个reduce任务中同时获取的最大map输出大小。由于每个输出都要求我们创建一个缓冲区来接收它,这表示每个reduce任务有固定的内存开销,因此,除非您有大量的内存,否则请保持较小的内存开销。建议调大点例如96M
spark.shuffle.io.maxRetries3huffle拉取数据失败的最大的尝试次数
spark.shuffle.io.retryWait5s等待拉取数据的时间间隔
spark.shuffle.sort.bypassMergeThreshold200
spark.shuffle.spill.compresstrue是否压缩shuffle过程中溢写的文件,会默认使用spark.io.compress.codec 压缩
spark.io.compression.codeclz4压缩内部的数据,比如RDD的Partition、event log、广播变量、shuffle的输出,可以设置为lz4,lzf,snappy,zstd
spark.shuffle.compresstrue是否压缩Map端的输出文件配合上面的codec使用进行压缩
spark.shuffle.managerHASH默认是Hash Spark 1.1 中可以设置为SORT

Spark 的内存管理

主要分为两个方面:
execution:用于做计算 Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations,

sotrage:用于存储 while storage memory refers to that used for caching and propagating internal data across the cluster.

内存管理:MemoryManager

  • spark.memory.useLegacyMode false 是否启用历史的模式进行内存管理

从配置中获取 SparkEnv.scala

Spark1.6 之后使用的是统一内存管理,之前使用的是静态内存管理

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =if (useLegacyMemoryManager) {new StaticMemoryManager(conf, numUsableCores)//静态内存管理} else {UnifiedMemoryManager(conf, numUsableCores) // 统一内存管理}

静态内存管理

静态内存管理的执行内存和存储内存是分开 的,当分配1G的内存的时候,实际上的能够使用执行内存是160M左右,能够使用的存储内存为540M左右。

StaticMemoryManager(conf, numUsableCores){  private def getMaxExecutionMemory(conf: SparkConf): Long = {,// 得到最大的执行内存 val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) // 0.2val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) // 0.8(systemMaxMemory * memoryFraction * safetyFraction).toLong               // 1000M * 0.2 * 0.8 = 160M}private def getMaxStorageMemory(conf: SparkConf): Long = {// 得到最大存储内存val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)// 0.6val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)// 0.9(systemMaxMemory * memoryFraction * safetyFraction).toLong				// 1000M* 0.6 * 0.9 = 540M}  
}

统一内存管理

在统一内存管理中Sprk的执行内存和存储内存不再单独出来,默认情况下execution和storage各占百分之50,这块是可以动态调整的。

object UnifiedMemoryManager {private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 // 300M 的系统预留内存def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {val maxMemory = getMaxMemory(conf){val usableMemory = systemMemory - reservedMemory 				// 系统内存减去 预留内存1000-300=700Mval memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)// 0.6(usableMemory * memoryFraction).toLong // 700M * 0.6 = 420M}
}


当执行端的内存没有被占用的时候,存储端可以获得执行端的内存来进行使用,执行端的内存被存储端占用之后,如果此时执行端需要使用内存,那么会将存储端占用的那部分内存强制剔除
当存储端空闲,执行端繁忙的时候,执行端可以借用存储端的内存,当存储端需要用的时候,内存端是不会还的

更多推荐

Spark Shuffle参数 及 内存管理

本文发布于:2023-07-28 20:16:19,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1298328.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:内存管理   参数   Spark   Shuffle

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!