Skip to content

Commit

Permalink
[SPARK-12074] Avoid memory copy involving ByteBuffer.wrap(ByteArrayOu…
Browse files Browse the repository at this point in the history
…tputStream.toByteArray)

SPARK-12060 fixed JavaSerializerInstance.serialize
This PR applies the same technique on two other classes.

zsxwing

Author: tedyu <[email protected]>

Closes apache#10177 from tedyu/master.
  • Loading branch information
tedyu authored and zsxwing committed Dec 8, 2015
1 parent 6cb06e8 commit 75c60bf
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.{Accumulator, SparkEnv, TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.Utils
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}


/**
Expand Down Expand Up @@ -172,7 +171,7 @@ private[spark] object Task {
serializer: SerializerInstance)
: ByteBuffer = {

val out = new ByteArrayOutputStream(4096)
val out = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(out)

// Write currentFiles
Expand All @@ -193,7 +192,7 @@ private[spark] object Task {
dataOut.flush()
val taskBytes = serializer.serialize(task)
Utils.writeByteBuffer(taskBytes, out)
ByteBuffer.wrap(out.toByteArray)
out.toByteBuffer
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1202,9 +1202,9 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
val byteStream = new ByteArrayOutputStream(4096)
val byteStream = new ByteBufferOutputStream(4096)
dataSerializeStream(blockId, byteStream, values, serializer)
ByteBuffer.wrap(byteStream.toByteArray)
byteStream.toByteBuffer
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import java.nio.ByteBuffer
/**
* Provide a zero-copy way to convert data in ByteArrayOutputStream to ByteBuffer
*/
private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
private[spark] class ByteBufferOutputStream(capacity: Int) extends ByteArrayOutputStream(capacity) {

def this() = this(32)

def toByteBuffer: ByteBuffer = {
return ByteBuffer.wrap(buf, 0, count)
Expand Down

0 comments on commit 75c60bf

Please sign in to comment.