Skip to content

Commit

Permalink
Modify BlockManager.dataSerialize to write ChunkedByteBuffers.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Mar 15, 2016
1 parent 3fbec21 commit e5e663f
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}

/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
Expand Down Expand Up @@ -481,7 +481,7 @@ private[spark] class BlockManager(
diskStore.getBytes(blockId)
} else if (level.useMemory && memoryStore.contains(blockId)) {
// The block was not found on disk, so serialize an in-memory copy:
new ChunkedByteBuffer(dataSerialize(blockId, memoryStore.getValues(blockId).get))
dataSerialize(blockId, memoryStore.getValues(blockId).get)
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
Expand Down Expand Up @@ -1281,11 +1281,11 @@ private[spark] class BlockManager(
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}

/** Serializes into a byte buffer. */
def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
val byteStream = new ByteBufferOutputStream(4096)
dataSerializeStream(blockId, byteStream, values)
byteStream.toByteBuffer
/** Serializes into a chunked byte buffer. */
def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = {
val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ private[spark] class MemoryStore(
val entry = if (level.deserialized) {
new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues))
} else {
// TODO(josh): incrementally serialize
val bytes = new ChunkedByteBuffer(blockManager.dataSerialize(blockId, arrayValues.iterator))
val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
new SerializedMemoryEntry(bytes, bytes.limit)
}
val size = entry.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
byteChannel.getData
}

def toByteBuffer: ByteBuffer = {
if (chunks.length == 1) {
chunks.head
} else {
ByteBuffer.wrap(toArray)
}
}

def toInputStream(dispose: Boolean = false): InputStream = {
new ChunkedByteBufferInputStream(this, dispose)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
numRecords = countIterator.count
serializedBlock
case ByteBufferBlock(byteBuffer) =>
byteBuffer
new ChunkedByteBuffer(byteBuffer.duplicate())
case _ =>
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
}
Expand All @@ -188,7 +188,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
val storeInBlockManagerFuture = Future {
val putSucceeded = blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedBlock.duplicate()),
serializedBlock,
effectiveStorageLevel,
tellMaster = true)
if (!putSucceeded) {
Expand All @@ -199,7 +199,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
writeAheadLog.write(serializedBlock.toByteBuffer, clock.getTimeMillis())
}

// Combine the futures, wait for both to complete, and return the write ahead log record handle
Expand Down

0 comments on commit e5e663f

Please sign in to comment.