Skip to content

Commit

Permalink
SPARK-18617: Close "kryo auto pick" feature for Spark Streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
uncleGen committed Nov 29, 2016
1 parent 71352c9 commit d205ebd
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
}

def getSerializer(ct: ClassTag[_]): Serializer = {
if (canUseKryo(ct)) {
def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
if (autoPick && canUseKryo(ct)) {
kryoSerializer
} else {
defaultSerializer
Expand Down Expand Up @@ -155,7 +155,12 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
outputStream: OutputStream,
values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
val ser = getSerializer(implicitly[ClassTag[T]]).newInstance()
val ser = blockId match {
case a: StreamBlockId =>
getSerializer(implicitly[ClassTag[T]], autoPick = false).newInstance()
case _ =>
getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
}
ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
}

Expand All @@ -171,7 +176,12 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
classTag: ClassTag[_]): ChunkedByteBuffer = {
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
val byteStream = new BufferedOutputStream(bbos)
val ser = getSerializer(classTag).newInstance()
val ser = blockId match {
case a: StreamBlockId =>
getSerializer(classTag, autoPick = false).newInstance()
case _ =>
getSerializer(classTag, autoPick = true).newInstance()
}
ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
bbos.toChunkedByteBuffer
}
Expand All @@ -185,8 +195,14 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
inputStream: InputStream)
(classTag: ClassTag[T]): Iterator[T] = {
val stream = new BufferedInputStream(inputStream)
getSerializer(classTag)
.newInstance()
val ser = blockId match {
case a: StreamBlockId =>
getSerializer(classTag, autoPick = false)
case _ =>
getSerializer(classTag, autoPick = true)
}

ser.newInstance()
.deserializeStream(wrapStream(blockId, stream))
.asIterator.asInstanceOf[Iterator[T]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
Expand Down Expand Up @@ -334,7 +334,12 @@ private[spark] class MemoryStore(
val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val ser = serializerManager.getSerializer(classTag).newInstance()
val ser = blockId match {
case a: StreamBlockId =>
serializerManager.getSerializer(classTag, autoPick = false).newInstance()
case _ =>
serializerManager.getSerializer(classTag, autoPick = true).newInstance()
}
ser.serializeStream(serializerManager.wrapStream(blockId, redirectableStream))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class PartiallySerializedBlockSuite
spy
}

val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
val serializer = serializerManager
.getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream)
redirectableOutputStream.setOutputStream(bbos)
val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream))
Expand Down Expand Up @@ -182,7 +183,8 @@ class PartiallySerializedBlockSuite
Mockito.verifyNoMoreInteractions(memoryStore)
Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer, atLeastOnce).dispose()

val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
val serializer = serializerManager
.getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
val deserialized =
serializer.deserializeStream(new ByteBufferInputStream(bbos.toByteBuffer)).asIterator.toSeq
assert(deserialized === items)
Expand Down

0 comments on commit d205ebd

Please sign in to comment.