Skip to content

Commit

Permalink
[SPARK-5307] Add a config option for SerializationDebugger.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jan 31, 2015
1 parent 740a568 commit f1d4629
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.Utils

private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int)
private[spark] class JavaSerializationStream(
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
extends SerializationStream {
private val objOut = new ObjectOutputStream(out)
private var counter = 0
Expand All @@ -42,7 +43,7 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
try {
objOut.writeObject(t)
} catch {
case e: NotSerializableException =>
case e: NotSerializableException if extraDebugInfo =>
throw SerializationDebugger.improveException(t, e)
}
counter += 1
Expand All @@ -69,7 +70,8 @@ extends DeserializationStream {
}


private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader)
private[spark] class JavaSerializerInstance(
counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader)
extends SerializerInstance {

override def serialize[T: ClassTag](t: T): ByteBuffer = {
Expand All @@ -93,7 +95,7 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade
}

override def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s, counterReset)
new JavaSerializationStream(s, counterReset, extraDebugInfo)
}

override def deserializeStream(s: InputStream): DeserializationStream = {
Expand All @@ -116,17 +118,20 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade
@DeveloperApi
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)

override def newInstance(): SerializerInstance = {
val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
new JavaSerializerInstance(counterReset, classLoader)
new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader)
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeInt(counterReset)
out.writeBoolean(extraDebugInfo)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
counterReset = in.readInt()
extraDebugInfo = in.readBoolean()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ private[serializer] object SerializationDebugger extends Logging {

/**
* Improve the given NotSerializableException with the serialization path leading from the given
* object to the problematic object.
* object to the problematic object. This is turned off automatically if
* `sun.io.serialization.extendedDebugInfo` flag is turned on for the JVM.
*/
def improveException(obj: Any, e: NotSerializableException): NotSerializableException = {
if (enableDebugging && reflect != null) {
Expand Down

0 comments on commit f1d4629

Please sign in to comment.