Skip to content

Commit

Permalink
putValues -> putIterator + putArray
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jul 24, 2014
1 parent beb368f commit f9ff82e
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 20 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,10 @@ private[spark] class BlockManager(
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
// Cache the values before returning them
val putResult = memoryStore.putValues(
val putResult = memoryStore.putIterator(
blockId, values, level, returnValues = true, allowPersistToDisk = false)
// The put may or may not have succeeded, depending on whether there was enough
// space to unroll the block. Either way, putValues should return an iterator.
// space to unroll the block. Either way, the put here should return an iterator.
putResult.data match {
case Left(it) =>
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
Expand Down Expand Up @@ -716,9 +716,9 @@ private[spark] class BlockManager(
// Actually put the values
val result = data match {
case IteratorValues(iterator) =>
blockStore.putValues(blockId, iterator, putLevel, returnValues)
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putValues(blockId, array, putLevel, returnValues)
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
Expand Down Expand Up @@ -873,7 +873,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data match {
case Left(elements) =>
diskStore.putValues(blockId, elements, level, returnValues = false)
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
def putValues(
def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult

def putValues(
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
PutResult(bytes.limit(), Right(bytes.duplicate()))
}

override def putValues(
override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putValues(blockId, values.toIterator, level, returnValues)
putIterator(blockId, values.toIterator, level, returnValues)
}

override def putValues(
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
bytes.rewind()
if (level.deserialized) {
val values = blockManager.dataDeserialize(blockId, bytes)
putValues(blockId, values, level, returnValues = true)
putIterator(blockId, values, level, returnValues = true)
} else {
val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}

override def putValues(
override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
Expand All @@ -101,12 +101,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

override def putValues(
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putValues(blockId, values, level, returnValues, allowPersistToDisk = true)
putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
}

/**
Expand All @@ -121,7 +121,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* back from disk and attempts to cache it in memory. In this case, we should not persist the
* block back on disk again, as it is already in disk store.
*/
private[storage] def putValues(
private[storage] def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
Expand All @@ -132,7 +132,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
val res = putValues(blockId, arrayValues, level, returnValues)
val res = putArray(blockId, arrayValues, level, returnValues)
droppedBlocks ++= res.droppedBlocks
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
Expand All @@ -141,7 +141,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
s"Free memory is $freeMemory bytes.")
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putValues(blockId, iteratorValues, level, returnValues)
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
PutResult(res.size, res.data, droppedBlocks)
} else {
PutResult(0, Left(iteratorValues), droppedBlocks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ private[spark] class TachyonStore(
putIntoTachyonStore(blockId, bytes, returnValues = true)
}

override def putValues(
override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putValues(blockId, values.toIterator, level, returnValues)
putIterator(blockId, values.toIterator, level, returnValues)
}

override def putValues(
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
Expand Down

0 comments on commit f9ff82e

Please sign in to comment.