Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4080] Only throw IOException from [write|read][Object|External] #2932

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.Map
import scala.reflect.ClassTag

import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils

/**
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
Expand Down Expand Up @@ -126,7 +127,7 @@ class Accumulable[R, T] (
}

// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
Expand All @@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString

private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
new ObjectWritable(t).write(out)
}

private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag](
}

/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}

/** Used by the JVM when deserializing this object. */
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.io.ByteArrayChunkOutputStream

/**
Expand Down Expand Up @@ -152,13 +152,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
}

/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream) {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}

/** Used by the JVM when deserializing this object. */
private def readObject(in: ObjectInputStream) {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorRef

import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.util.Utils

private[spark] class ApplicationInfo(
val startTime: Long,
Expand All @@ -46,7 +47,7 @@ private[spark] class ApplicationInfo(

init()

private def readObject(in: java.io.ObjectInputStream): Unit = {
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import java.util.Date

import org.apache.spark.deploy.DriverDescription
import org.apache.spark.util.Utils

private[spark] class DriverInfo(
val startTime: Long,
Expand All @@ -36,7 +37,7 @@ private[spark] class DriverInfo(

init()

private def readObject(in: java.io.ObjectInputStream): Unit = {
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] class WorkerInfo(
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed

private def readObject(in: java.io.ObjectInputStream) : Unit = {
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.util.Utils

private[spark]
class CartesianPartition(
Expand All @@ -36,7 +37,7 @@ class CartesianPartition(
override val index: Int = idx

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
s1 = rdd1.partitions(s1Index)
s2 = rdd2.partitions(s2Index)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
import org.apache.spark.util.Utils
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle

Expand All @@ -39,7 +40,7 @@ private[spark] case class NarrowCoGroupSplitDep(
) extends CoGroupSplitDep {

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
split = rdd.partitions(splitIndex)
oos.defaultWriteObject()
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.language.existentials
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.util.Utils

/**
* Class that captures a coalesced RDD by essentially keeping track of parent partitions
Expand All @@ -42,7 +43,7 @@ private[spark] case class CoalescedRDDPartition(
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent partition at the time of task serialization
parents = parentsIndices.map(rdd.partitions(_))
oos.defaultWriteObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
override def index: Int = slice

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {

val sfactory = SparkEnv.get.serializer

Expand All @@ -67,7 +67,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {

val sfactory = SparkEnv.get.serializer
sfactory match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag

import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
import org.apache.spark.util.Utils

/**
* Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of
Expand All @@ -38,7 +39,7 @@ class PartitionerAwareUnionRDDPartition(
override def hashCode(): Int = idx

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent partition at the time of task serialization
parents = rdds.map(_.partitions(index)).toArray
oos.defaultWriteObject()
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.reflect.ClassTag

import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

/**
* Partition for UnionRDD.
Expand All @@ -48,7 +49,7 @@ private[spark] class UnionPartition[T: ClassTag](
override val index: Int = idx

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
parentPartition = rdd.partitions(parentRddPartitionIndex)
oos.defaultWriteObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag

import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
import org.apache.spark.util.Utils

private[spark] class ZippedPartitionsPartition(
idx: Int,
Expand All @@ -34,7 +35,7 @@ private[spark] class ZippedPartitionsPartition(
def partitions = partitionValues

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
partitionValues = rdds.map(rdd => rdd.partitions(idx))
oos.defaultWriteObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
Expand Down Expand Up @@ -105,13 +106,13 @@ private[spark] class CompressedMapStatus(
MapStatus.decompressSize(compressedSizes(reduceId))
}

override def writeExternal(out: ObjectOutput): Unit = {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}

override def readExternal(in: ObjectInput): Unit = {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
val len = in.readInt()
compressedSizes = new Array[Byte](len)
Expand Down Expand Up @@ -152,13 +153,13 @@ private[spark] class HighlyCompressedMapStatus private (
}
}

override def writeExternal(out: ObjectOutput): Unit = {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
}

override def readExternal(in: ObjectInput): Unit = {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long

def this() = this(null.asInstanceOf[ByteBuffer], null, null)

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {

out.writeInt(valueBytes.remaining);
Utils.writeByteBuffer(valueBytes, out)
Expand All @@ -55,7 +55,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
out.writeObject(metrics)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {

val blen = in.readInt()
val byteVal = new Array[Byte](blen)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
new JavaSerializerInstance(counterReset, classLoader)
}

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeInt(counterReset)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
counterReset = in.readInt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ class BlockManagerId private (

def isDriver: Boolean = (executorId == "<driver>")

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}

import akka.actor.ActorRef

import org.apache.spark.util.Utils

private[spark] object BlockManagerMessages {
//////////////////////////////////////////////////////////////////////////////////
// Messages from the master to slaves.
Expand Down Expand Up @@ -65,7 +67,7 @@ private[spark] object BlockManagerMessages {

def this() = this(null, null, null, 0, 0, 0) // For deserialization only

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
blockManagerId.writeExternal(out)
out.writeUTF(blockId.name)
storageLevel.writeExternal(out)
Expand All @@ -74,7 +76,7 @@ private[spark] object BlockManagerMessages {
out.writeLong(tachyonSize)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
blockManagerId = BlockManagerId(in)
blockId = BlockId(in.readUTF())
storageLevel = StorageLevel(in)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -97,12 +98,12 @@ class StorageLevel private(
ret
}

override def writeExternal(out: ObjectOutput) {
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeByte(toInt)
out.writeByte(_replication)
}

override def readExternal(in: ObjectInput) {
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
val flags = in.readByte()
_useDisk = (flags & 8) != 0
_useMemory = (flags & 4) != 0
Expand Down
Loading