spar 内存存储 MemoryStore



private sealed trait MemoryEntry[T] {def size: Long//当前块的大小def memoryMode: MemoryMode//block存入内存的内存模式def classTag: ClassTag[T] //block的内存标记
private case class DeserializedMemoryEntry[T](value: Array[T],size: Long,classTag: ClassTag[T]) extends MemoryEntry[T] {val memoryMode: MemoryMode = MemoryMode.ON_HEAP //反序列化内存模式只能是堆
private case class SerializedMemoryEntry[T](buffer: ChunkedByteBuffer,memoryMode: MemoryMode,classTag: ClassTag[T]) extends MemoryEntry[T] {def size: Long = buffer.size

在memoryStore中,block以两类数据存储,一类是序列化的以 ByteBuffer的形式存在,一类是非序列化的只能在堆内存中存在.


  1. memoryStore中一部分内存用于存储块.
  2. 一部分用于展开块.什么叫展开块占用的内存呢?举个例子,你查询数据库得到一个迭代器,你想把这些数据缓存.
    你不能只缓存这个迭代器,因为它可能是从数据库一行行去拿数据,你需要调用 Iterator.toList,但是这样一下子拿来了很多数据,一下子把内存撑爆了,unrollMemory这部分就用来展开迭代器的内存.
private def putIterator方法
  /*** Attempt to put the given block in memory store as values or bytes.** 尝试将给定块作为values或字节存储在内存中。(对应putIteratorAsValues,putIteratorAsBytes方法* putIteratorAsValues反序列化存储在堆内内存)** It's possible that the iterator is too large to materialize and store in memory. To avoid* OOM exceptions, this method will gradually unroll the iterator while periodically checking* whether there is enough free memory. If the block is successfully materialized, then the* temporary unroll memory used during the materialization is "transferred" to storage memory,* so we won't acquire more memory than is actually needed to store the block.** iterator太大而不能物化和存储到内存是可能的.为了避免OOM异常,此方法将逐步展开迭代器,* 同时定期检查是否有足够的可用内存。如果这个块成功的物化.在物化过程中用的临时展开内存变成存储内存.*所以我们不会获得比存储块实际需要的内存更多的内存。*** @param blockId The block id.* @param values The values which need be stored.  需要存储的值* @param classTag the [[ClassTag]] for the block.* @param memoryMode The values saved memory mode(ON_HEAP or OFF_HEAP).* @param valuesHolder A holder that supports storing record of values into memory store as*        values or bytes. 一个处理器,支持以作为values和bytes存储存储values的记录到内存存储* @return if the block is stored successfully, return the stored data size. Else return the*         memory has reserved for unrolling the block (There are two reasons for store failed:*         First, the block is partially-unrolled; second, the block is entirely unrolled and*         the actual stored data size is larger than reserved, but we can't request extra*         memory).*         如果这个块存储成功.返回已存储数据的大小.否则返回已经为展开块保留的内存.*         (有两个存储失败的原因 .第一,块部分展开;第二是块完全展开,实际的存储大小大于保留的,*         但我们不能请求额外内存)*/private def putIterator[T](blockId: BlockId,values: Iterator[T],classTag: ClassTag[T],memoryMode: MemoryMode,valuesHolder: ValuesHolder[T]): Either[Long, Long] = {require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")// Number of elements unrolled so far//已经展开的元素个数var elementsUnrolled = 0// Whether there is still enough memory for us to continue unrolling this block//是否有足够的空间给我们来展开内存var keepUnrolling = true// Initial per-task memory to request for unrolling blocks (bytes).//初始化 每个任务来展开块的内存val initialMemoryThreshold = unrollMemoryThreshold// How often to check whether we need to request more memory//多久检查我们需要更多的展开内存val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)// Memory currently reserved by this task for this particular unrolling operation//用于展开操作保留的内存var memoryThreshold = initialMemoryThreshold// Memory to request as a multiple of current vector size//展开内存请求因子, 1.5val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)// Keep track of unroll memory used by this particular block / putIterator() operationvar unrollMemoryUsedByThisBlock = 0L// Request enough memory to begin unrolling//请求足够的内存来开始展开keepUnrolling =reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)if (!keepUnrolling) {logWarning(s"Failed to reserve initial memory threshold of " +s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")} else {unrollMemoryUsedByThisBlock += initialMemoryThreshold}// Unroll this block safely, checking whether we have exceeded our threshold periodically安全地展开这个区块,定期检查我们是否超过我们的阈值while (values.hasNext && keepUnrolling) {//放进去valuesHolder.storeValue(是否到检查的时候if (elementsUnrolled % memoryCheckPeriod == 0) {//估计已经用了多少值val currentSize = valuesHolder.estimatedSize()// If our vector's size has exceeded the threshold, request more memory//如果超了,请求更多的内存if (currentSize >= memoryThreshold) {val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLongkeepUnrolling =reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)if (keepUnrolling) {unrollMemoryUsedByThisBlock += amountToRequest}// New threshold is currentSize * memoryGrowthFactormemoryThreshold += amountToRequest}}//元素加1elementsUnrolled += 1}// Make sure that we have enough memory to store the block. By this point, it is possible that// the block's actual memory usage has exceeded the unroll memory by a small amount, so we// perform one final call to attempt to allocate additional memory if necessary.//确保我们有足够的内存来存这个块,到这个点,这个块的实际内存使用可能已经超过了unroll memory一个小的数量//所以如果必要的话我们执行一个最终的调用来获取额外的内存if (keepUnrolling) {val entryBuilder = valuesHolder.getBuilder()val size = entryBuilder.preciseSizeif (size > unrollMemoryUsedByThisBlock) {val amountToRequest = size - unrollMemoryUsedByThisBlockkeepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)if (keepUnrolling) {unrollMemoryUsedByThisBlock += amountToRequest}}if (keepUnrolling) {val entry = Synchronize so that transfer is atomic//加锁,所以这个转换是元子的,由展开内存到存储内存.memoryManager.synchronized {releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)assert(success, "transferring unroll memory to storage memory failed")}entries.synchronized {entries.put(blockId, entry)}logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))Right(entry.size)} else {// We ran out of space while unrolling the values for this block//我们在展开这个块的值时耗尽了空间logUnrollFailureMessage(blockId, entryBuilder.preciseSize)Left(unrollMemoryUsedByThisBlock)}} else {// We ran out of space while unrolling the values for this blocklogUnrollFailureMessage(blockId, valuesHolder.estimatedSize())Left(unrollMemoryUsedByThisBlock)}}


  /*** Try to evict blocks to free up a given amount of space to store a particular block.* Can fail if either the block is bigger than our memory or it would require replacing* another block from the same RDD (which leads to a wasteful cyclic replacement pattern for* RDDs that don't fit into memory that we want to avoid).** 尝试驱逐块来释放给定的空间来存储一个特殊的块,如果这个块大于我们的内存或 它将替换另一个相同* rdd的块(这导致一个浪费的循环替代模式)这可能失败** @param blockId the ID of the block we are freeing space for, if any* @param space the size of this block* @param memoryMode the type of memory to free (on- or off-heap)* @return the amount of memory (in bytes) freed by eviction*/private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId],space: Long,memoryMode: MemoryMode): Long = {assert(space > 0)memoryManager.synchronized {var freedMemory = 0L//需要添加的rddval rddToAdd = blockId.flatMap(getRddId)//选择的块val selectedBlocks = new ArrayBuffer[BlockId]//是否可替换def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {//是这个块的内存模型 且 不是这个rdd的一个块entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))}// This is synchronized to ensure that the set of entries is not changed// (because of getValue or getBytes) while traversing the iterator, as that// can lead to exceptions.entries.synchronized {val iterator = entries.entrySet().iterator()//释放的内存不足while (freedMemory < space && iterator.hasNext) {val pair = blockId = pair.getKeyval entry = pair.getValue//如果符合条件if (blockIsEvictable(blockId, entry)) {// We don't want to evict blocks which are currently being read, so we need to obtain// an exclusive write lock on blocks which are candidates for eviction. We perform a// non-blocking "tryLock" here in order to ignore blocks which are locked for reading://我们不想驱逐正在被读取的块,所以我们需要在用来驱逐的候选人块获取一个排它的写锁,//我们在这里执行一个 非阻塞的 tryLock为了忽略被锁定读的块if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {selectedBlocks += blockIdfreedMemory += pair.getValue.size}}}}//驱逐块def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {val data = entry match {case DeserializedMemoryEntry(values, _, _) => Left(values)case SerializedMemoryEntry(buffer, _, _) => Right(buffer)}val newEffectiveStorageLevel =blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)//有效if (newEffectiveStorageLevel.isValid) {// The block is still present in at least one store, so release the lock// but don't delete the block info// 释放锁,不删除blockInfoblockInfoManager.unlock(blockId)} else {// The block isn't present in any store, so delete the block info so that the// block can be stored again// block不出现在任何store,删除block info以至于 block可以再次存储blockInfoManager.removeBlock(blockId)}}//如果 freedMemory大于空间if (freedMemory >= space) {var lastSuccessfulBlock = -1try {logInfo(s"${selectedBlocks.size} blocks selected for dropping " +s"(${Utils.bytesToString(freedMemory)} bytes)")(0 until selectedBlocks.size).foreach { idx =>val blockId = selectedBlocks(idx)val entry = entries.synchronized {entries.get(blockId)}// This should never be null as only one task should be dropping// blocks and removing entries. However the check is still here for// future safety.if (entry != null) {dropBlock(blockId, entry)afterDropAction(blockId)}lastSuccessfulBlock = idx}logInfo(s"After dropping ${selectedBlocks.size} blocks, " +s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")freedMemory} finally {// like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal// with InterruptedExceptionif (lastSuccessfulBlock != selectedBlocks.size - 1) {// the blocks we didn't process successfully are still locked, so we have to unlock them//释放没处理成功的锁(lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>val blockId = selectedBlocks(idx)blockInfoManager.unlock(blockId)}}}} else {blockId.foreach { id =>logInfo(s"Will not store $id")}selectedBlocks.foreach { id =>blockInfoManager.unlock(id)}0L}}}


