diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 690c8f32a705e..d746526639e58 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -463,10 +463,10 @@ private[spark] class BlockManager( val values = dataDeserialize(blockId, bytes) if (level.deserialized) { // Cache the values before returning them - val putResult = memoryStore.putValues( + val putResult = memoryStore.putIterator( blockId, values, level, returnValues = true, allowPersistToDisk = false) // The put may or may not have succeeded, depending on whether there was enough - // space to unroll the block. Either way, putValues should return an iterator. + // space to unroll the block. Either way, the put here should return an iterator. putResult.data match { case Left(it) => return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) @@ -716,9 +716,9 @@ private[spark] class BlockManager( // Actually put the values val result = data match { case IteratorValues(iterator) => - blockStore.putValues(blockId, iterator, putLevel, returnValues) + blockStore.putIterator(blockId, iterator, putLevel, returnValues) case ArrayValues(array) => - blockStore.putValues(blockId, array, putLevel, returnValues) + blockStore.putArray(blockId, array, putLevel, returnValues) case ByteBufferValues(bytes) => bytes.rewind() blockStore.putBytes(blockId, bytes, putLevel) @@ -873,7 +873,7 @@ private[spark] class BlockManager( logInfo(s"Writing block $blockId to disk") data match { case Left(elements) => - diskStore.putValues(blockId, elements, level, returnValues = false) + diskStore.putArray(blockId, elements, level, returnValues = false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index 68fd2b2156ee7..69985c9759e2d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -37,13 +37,13 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends * @return a PutResult that contains the size of the data, as well as the values put if * returnValues is true (if not, the result's data field can be null) */ - def putValues( + def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult - def putValues( + def putArray( blockId: BlockId, values: Array[Any], level: StorageLevel, diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 422179b44b189..c83261dd91b36 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -55,15 +55,15 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc PutResult(bytes.limit(), Right(bytes.duplicate())) } - override def putValues( + override def putArray( blockId: BlockId, values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - putValues(blockId, values.toIterator, level, returnValues) + putIterator(blockId, values.toIterator, level, returnValues) } - override def putValues( + override def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index cac7dcfbde0bc..e11504fb4018e 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -78,14 +78,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) - putValues(blockId, values, level, returnValues = true) + putIterator(blockId, values, level, returnValues = true) } else { val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } - override def putValues( + override def putArray( blockId: BlockId, values: Array[Any], level: StorageLevel, @@ -101,12 +101,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } - override def putValues( + override def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - putValues(blockId, values, level, returnValues, allowPersistToDisk = true) + putIterator(blockId, values, level, returnValues, allowPersistToDisk = true) } /** @@ -121,7 +121,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * back from disk and attempts to cache it in memory. In this case, we should not persist the * block back on disk again, as it is already in disk store. */ - private[storage] def putValues( + private[storage] def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, @@ -132,7 +132,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) unrolledValues match { case Left(arrayValues) => // Values are fully unrolled in memory, so store them as an array - val res = putValues(blockId, arrayValues, level, returnValues) + val res = putArray(blockId, arrayValues, level, returnValues) droppedBlocks ++= res.droppedBlocks PutResult(res.size, res.data, droppedBlocks) case Right(iteratorValues) => @@ -141,7 +141,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) s"Free memory is $freeMemory bytes.") if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") - val res = blockManager.diskStore.putValues(blockId, iteratorValues, level, returnValues) + val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) PutResult(res.size, res.data, droppedBlocks) } else { PutResult(0, Left(iteratorValues), droppedBlocks) diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index c843a50b70304..932b5616043b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -43,15 +43,15 @@ private[spark] class TachyonStore( putIntoTachyonStore(blockId, bytes, returnValues = true) } - override def putValues( + override def putArray( blockId: BlockId, values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - putValues(blockId, values.toIterator, level, returnValues) + putIterator(blockId, values.toIterator, level, returnValues) } - override def putValues( + override def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel,