Skip to content

Commit

Permalink
[SPARK-5307] SerializationDebugger to help debug NotSerializableExcep…
Browse files Browse the repository at this point in the history
…tion.

This patch adds a SerializationDebugger that is used to add more information to
a NotSerializableException. When a NotSerializableException is encountered, the
debugger tries to serialize the object one more time through a DebugStream that
hooks into the internals of ObjectOutputStream to get the serialization stack.

An example output looks like this:

org.apache.spark.serializer.NotSerializableClass
	Serialization stack (3):
	- org.apache.spark.serializer.NotSerializableClass@5e20dc10 (class org.apache.spark.serializer.NotSerializableClass)
	- org.apache.spark.serializer.SerializableClass2@521fb14e (class org.apache.spark.serializer.SerializableClass2)
	- org.apache.spark.serializer.SerializableClass1@5f54e92c (class org.apache.spark.serializer.SerializableClass1)
	Run the JVM with sun.io.serialization.extendedDebugInfo for more information.
  • Loading branch information
rxin committed Jan 18, 2015
1 parent e7884bc commit f7e6320
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
* the stream 'resets' object class descriptions have to be re-written)
*/
def writeObject[T: ClassTag](t: T): SerializationStream = {
objOut.writeObject(t)
SerializationDebugger.writeObject(objOut, t)
counter += 1
if (counterReset > 0 && counter >= counterReset) {
objOut.reset()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.serializer

import java.io._
import java.lang.reflect.Field
import java.security.AccessController

import scala.collection.mutable
import scala.util.control.NonFatal


private[serializer]
object SerializationDebugger {

/**
* Write an object to the [[ObjectOutputStream]]. If a NotSerializableException is encountered,
* use our debug stream to capture the serialization stack leading to the problematic object.
*/
def writeObject(out: ObjectOutputStream, obj: Any): Unit = {
try {
out.writeObject(obj)
} catch {
case e: NotSerializableException =>
if (enableDebugging) throw improveException(obj, e) else throw e
}
}

/**
* Improve the given NotSerializableException with the serialization stack leading from the given
* object to the problematic object.
*/
private def improveException(obj: Any, e: NotSerializableException): NotSerializableException = {
if (depthField != null) {
val out = new DebugStream(new ByteArrayOutputStream)
try {
out.writeObject(obj)
e
} catch {
case nse: NotSerializableException =>
new NotSerializableException(
nse.getMessage + "\n" +
s"\tSerialization stack (${out.stack.size}):\n" +
out.stack.map(o => s"\t- $o (class ${o.getClass.getName})").mkString("\n") + "\n" +
"\tRun the JVM with sun.io.serialization.extendedDebugInfo for more information.")
case _: Throwable => e
}
} else {
e
}
}

/** Reference to the private depth field in ObjectOutputStream. */
private val depthField: Field = try {
val f = classOf[ObjectOutputStream].getDeclaredField("depth")
f.setAccessible(true)
f
} catch {
case NonFatal(e) => null
}

/**
* Whether to enable this debugging or not. By default, the special debugging feature is disabled
* if the JVM is run with sun.io.serialization.extendedDebugInfo.
*/
private[serializer] var enableDebugging: Boolean = {
!AccessController.doPrivileged(new sun.security.action.GetBooleanAction(
"sun.io.serialization.extendedDebugInfo")).booleanValue()
}

/**
* An [[ObjectOutputStream]] that collects the serialization stack when a NotSerializableException
* is thrown.
*
* This works by hooking into ObjectOutputStream internals using replaceObject method and the
* private depth field. Inspired by Bob Lee's DebuggingObjectOutputStream.
*/
private class DebugStream(underlying: OutputStream) extends ObjectOutputStream(underlying) {

// Enable replacement so replaceObject is called whenever an object is being serialized.
enableReplaceObject(true)

val stack = new mutable.Stack[Object]

private var foundNotSerializableObject = false

/**
* Called when [[ObjectOutputStream]] tries to serialize any object.
*/
override protected def replaceObject(obj: Object): Object = obj match {
case _: NotSerializableException if depth == 1 =>
// When an object is not serializable, ObjectOutputStream resets the depth to 1 and writes
// an NotSerializableException to the stream, and we will catch it here.
// Once we reach here, the stack is what we want to return back to the caller.
foundNotSerializableObject = true
obj
case _ =>
if (!foundNotSerializableObject) {
// Once ObjectOutputStream finishes serializing an object (and its fields), it will
// decrease the depth field and serialize the next object. We pop the stack since
// everything above depth has been successfully serialized.
while (depth < stack.size) {
stack.pop()
}
stack.push(obj)
}
obj
}

/** Return the value of the private depth field in [[ObjectOutputStream]]. */
private def depth: Int = SerializationDebugger.depthField.get(this).asInstanceOf[Int]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.serializer

import java.io.{NotSerializableException, ObjectOutputStream, ByteArrayOutputStream}

import org.scalatest.FunSuite


class SerializationDebuggerSuite extends FunSuite {

test("normal serialization") {
SerializationDebugger.enableDebugging = true
val out = new ObjectOutputStream(new ByteArrayOutputStream)
SerializationDebugger.writeObject(out, 1)
out.close()
}

test("NotSerializableException with stack") {
SerializationDebugger.enableDebugging = true
val out = new ObjectOutputStream(new ByteArrayOutputStream)
val obj = new SerializableClass1(new SerializableClass2(new NotSerializableClass))
val e = intercept[NotSerializableException] {
SerializationDebugger.writeObject(out, obj)
}
out.close()

assert(e.getMessage.contains("SerializableClass1"))
assert(e.getMessage.contains("SerializableClass2"))
assert(e.getMessage.contains("NotSerializableClass"))
}
}

class NotSerializableClass

class SerializableClass1(val a: Object) extends Serializable

class SerializableClass2(val b: Object) extends Serializable

0 comments on commit f7e6320

Please sign in to comment.