Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into demarcate-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 28, 2015
2 parents c43ffc4 + 530efe3 commit c7460c0
Show file tree
Hide file tree
Showing 18 changed files with 235 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ sorttable = {
this.removeChild(document.getElementById('sorttable_sortfwdind'));
sortrevind = document.createElement('span');
sortrevind.id = "sorttable_sortrevind";
sortrevind.innerHTML = stIsIE ? '&nbsp<font face="webdings">5</font>' : '&nbsp;&#x25B4;';
sortrevind.innerHTML = stIsIE ? '&nbsp<font face="webdings">5</font>' : '&nbsp;&#x25BE;';
this.appendChild(sortrevind);
return;
}
Expand All @@ -113,7 +113,7 @@ sorttable = {
this.removeChild(document.getElementById('sorttable_sortrevind'));
sortfwdind = document.createElement('span');
sortfwdind.id = "sorttable_sortfwdind";
sortfwdind.innerHTML = stIsIE ? '&nbsp<font face="webdings">6</font>' : '&nbsp;&#x25BE;';
sortfwdind.innerHTML = stIsIE ? '&nbsp<font face="webdings">6</font>' : '&nbsp;&#x25B4;';
this.appendChild(sortfwdind);
return;
}
Expand All @@ -134,7 +134,7 @@ sorttable = {
this.className += ' sorttable_sorted';
sortfwdind = document.createElement('span');
sortfwdind.id = "sorttable_sortfwdind";
sortfwdind.innerHTML = stIsIE ? '&nbsp<font face="webdings">6</font>' : '&nbsp;&#x25BE;';
sortfwdind.innerHTML = stIsIE ? '&nbsp<font face="webdings">6</font>' : '&nbsp;&#x25B4;';
this.appendChild(sortfwdind);

// build an array to sort. This is a Schwartzian transform thing,
Expand Down
129 changes: 106 additions & 23 deletions core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.serializer

import java.io.{EOFException, InputStream, OutputStream}
import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import scala.reflect.ClassTag

Expand Down Expand Up @@ -136,21 +137,45 @@ class KryoSerializer(conf: SparkConf)
}

private[spark]
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
val output = new KryoOutput(outStream)
class KryoSerializationStream(
serInstance: KryoSerializerInstance,
outStream: OutputStream) extends SerializationStream {

private[this] var output: KryoOutput = new KryoOutput(outStream)
private[this] var kryo: Kryo = serInstance.borrowKryo()

override def writeObject[T: ClassTag](t: T): SerializationStream = {
kryo.writeClassAndObject(output, t)
this
}

override def flush() { output.flush() }
override def close() { output.close() }
override def flush() {
if (output == null) {
throw new IOException("Stream is closed")
}
output.flush()
}

override def close() {
if (output != null) {
try {
output.close()
} finally {
serInstance.releaseKryo(kryo)
kryo = null
output = null
}
}
}
}

private[spark]
class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
private val input = new KryoInput(inStream)
class KryoDeserializationStream(
serInstance: KryoSerializerInstance,
inStream: InputStream) extends DeserializationStream {

private[this] var input: KryoInput = new KryoInput(inStream)
private[this] var kryo: Kryo = serInstance.borrowKryo()

override def readObject[T: ClassTag](): T = {
try {
Expand All @@ -163,52 +188,105 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
}

override def close() {
// Kryo's Input automatically closes the input stream it is using.
input.close()
if (input != null) {
try {
// Kryo's Input automatically closes the input stream it is using.
input.close()
} finally {
serInstance.releaseKryo(kryo)
kryo = null
input = null
}
}
}
}

private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
private val kryo = ks.newKryo()

// Make these lazy vals to avoid creating a buffer unless we use them
/**
* A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
* not synchronized.
*/
@Nullable private[this] var cachedKryo: Kryo = borrowKryo()

/**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
* otherwise, it allocates a new instance.
*/
private[serializer] def borrowKryo(): Kryo = {
if (cachedKryo != null) {
val kryo = cachedKryo
// As a defensive measure, call reset() to clear any Kryo state that might have been modified
// by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
kryo.reset()
cachedKryo = null
kryo
} else {
ks.newKryo()
}
}

/**
* Release a borrowed [[Kryo]] instance. If this serializer instance already has a cached Kryo
* instance, then the given Kryo instance is discarded; otherwise, the Kryo is stored for later
* re-use.
*/
private[serializer] def releaseKryo(kryo: Kryo): Unit = {
if (cachedKryo == null) {
cachedKryo = kryo
}
}

// Make these lazy vals to avoid creating a buffer unless we use them.
private lazy val output = ks.newKryoOutput()
private lazy val input = new KryoInput()

override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
val kryo = borrowKryo()
try {
kryo.writeClassAndObject(output, t)
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
"increase spark.kryoserializer.buffer.max value.")
} finally {
releaseKryo(kryo)
}
ByteBuffer.wrap(output.toBytes)
}

override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
input.setBuffer(bytes.array)
kryo.readClassAndObject(input).asInstanceOf[T]
val kryo = borrowKryo()
try {
input.setBuffer(bytes.array)
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
releaseKryo(kryo)
}
}

override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
val kryo = borrowKryo()
val oldClassLoader = kryo.getClassLoader
kryo.setClassLoader(loader)
input.setBuffer(bytes.array)
val obj = kryo.readClassAndObject(input).asInstanceOf[T]
kryo.setClassLoader(oldClassLoader)
obj
try {
kryo.setClassLoader(loader)
input.setBuffer(bytes.array)
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
kryo.setClassLoader(oldClassLoader)
releaseKryo(kryo)
}
}

override def serializeStream(s: OutputStream): SerializationStream = {
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
new KryoSerializationStream(kryo, s)
new KryoSerializationStream(this, s)
}

override def deserializeStream(s: InputStream): DeserializationStream = {
new KryoDeserializationStream(kryo, s)
new KryoDeserializationStream(this, s)
}

/**
Expand All @@ -218,7 +296,12 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
def getAutoReset(): Boolean = {
val field = classOf[Kryo].getDeclaredField("autoReset")
field.setAccessible(true)
field.get(kryo).asInstanceOf[Boolean]
val kryo = borrowKryo()
try {
field.get(kryo).asInstanceOf[Boolean]
} finally {
releaseKryo(kryo)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.serializer

import java.io._
import java.nio.ByteBuffer
import javax.annotation.concurrent.NotThreadSafe

import scala.reflect.ClassTag

Expand Down Expand Up @@ -114,8 +115,12 @@ object Serializer {
/**
* :: DeveloperApi ::
* An instance of a serializer, for use by one thread at a time.
*
* It is legal to create multiple serialization / deserialization streams from the same
* SerializerInstance as long as those streams are all used within the same thread.
*/
@DeveloperApi
@NotThreadSafe
abstract class SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import scala.collection.mutable.ArrayBuffer
* occupy a contiguous segment of memory.
*/
private[spark] class ChainedBuffer(chunkSize: Int) {
private val chunkSizeLog2 = (math.log(chunkSize) / math.log(2)).toInt
assert(math.pow(2, chunkSizeLog2).toInt == chunkSize,

private val chunkSizeLog2: Int = java.lang.Long.numberOfTrailingZeros(
java.lang.Long.highestOneBit(chunkSize))
assert((1 << chunkSizeLog2) == chunkSize,
s"ChainedBuffer chunk size $chunkSize must be a power of two")
private val chunks: ArrayBuffer[Array[Byte]] = new ArrayBuffer[Array[Byte]]()
private var _size: Int = _
private var _size: Long = 0

/**
* Feed bytes from this buffer into a BlockObjectWriter.
Expand All @@ -41,16 +43,16 @@ private[spark] class ChainedBuffer(chunkSize: Int) {
* @param os OutputStream to read into.
* @param len Number of bytes to read.
*/
def read(pos: Int, os: OutputStream, len: Int): Unit = {
def read(pos: Long, os: OutputStream, len: Int): Unit = {
if (pos + len > _size) {
throw new IndexOutOfBoundsException(
s"Read of $len bytes at position $pos would go past size ${_size} of buffer")
}
var chunkIndex = pos >> chunkSizeLog2
var posInChunk = pos - (chunkIndex << chunkSizeLog2)
var written = 0
var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
var written: Int = 0
while (written < len) {
val toRead = math.min(len - written, chunkSize - posInChunk)
val toRead: Int = math.min(len - written, chunkSize - posInChunk)
os.write(chunks(chunkIndex), posInChunk, toRead)
written += toRead
chunkIndex += 1
Expand All @@ -66,16 +68,16 @@ private[spark] class ChainedBuffer(chunkSize: Int) {
* @param offs Offset in the byte array to read to.
* @param len Number of bytes to read.
*/
def read(pos: Int, bytes: Array[Byte], offs: Int, len: Int): Unit = {
def read(pos: Long, bytes: Array[Byte], offs: Int, len: Int): Unit = {
if (pos + len > _size) {
throw new IndexOutOfBoundsException(
s"Read of $len bytes at position $pos would go past size of buffer")
}
var chunkIndex = pos >> chunkSizeLog2
var posInChunk = pos - (chunkIndex << chunkSizeLog2)
var written = 0
var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
var written: Int = 0
while (written < len) {
val toRead = math.min(len - written, chunkSize - posInChunk)
val toRead: Int = math.min(len - written, chunkSize - posInChunk)
System.arraycopy(chunks(chunkIndex), posInChunk, bytes, offs + written, toRead)
written += toRead
chunkIndex += 1
Expand All @@ -91,22 +93,22 @@ private[spark] class ChainedBuffer(chunkSize: Int) {
* @param offs Offset in the byte array to write from.
* @param len Number of bytes to write.
*/
def write(pos: Int, bytes: Array[Byte], offs: Int, len: Int): Unit = {
def write(pos: Long, bytes: Array[Byte], offs: Int, len: Int): Unit = {
if (pos > _size) {
throw new IndexOutOfBoundsException(
s"Write at position $pos starts after end of buffer ${_size}")
}
// Grow if needed
val endChunkIndex = (pos + len - 1) >> chunkSizeLog2
val endChunkIndex: Int = ((pos + len - 1) >> chunkSizeLog2).toInt
while (endChunkIndex >= chunks.length) {
chunks += new Array[Byte](chunkSize)
}

var chunkIndex = pos >> chunkSizeLog2
var posInChunk = pos - (chunkIndex << chunkSizeLog2)
var written = 0
var chunkIndex: Int = (pos >> chunkSizeLog2).toInt
var posInChunk: Int = (pos - (chunkIndex.toLong << chunkSizeLog2)).toInt
var written: Int = 0
while (written < len) {
val toWrite = math.min(len - written, chunkSize - posInChunk)
val toWrite: Int = math.min(len - written, chunkSize - posInChunk)
System.arraycopy(bytes, offs + written, chunks(chunkIndex), posInChunk, toWrite)
written += toWrite
chunkIndex += 1
Expand All @@ -119,19 +121,19 @@ private[spark] class ChainedBuffer(chunkSize: Int) {
/**
* Total size of buffer that can be written to without allocating additional memory.
*/
def capacity: Int = chunks.size * chunkSize
def capacity: Long = chunks.size.toLong * chunkSize

/**
* Size of the logical buffer.
*/
def size: Int = _size
def size: Long = _size
}

/**
* Output stream that writes to a ChainedBuffer.
*/
private[spark] class ChainedBufferOutputStream(chainedBuffer: ChainedBuffer) extends OutputStream {
private var pos = 0
private var pos: Long = 0

override def write(b: Int): Unit = {
throw new UnsupportedOperationException()
Expand Down
Loading

0 comments on commit c7460c0

Please sign in to comment.