Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Jul 24, 2014
2 parents 22d7ebd + 2d25e34 commit 2a18d6b
Show file tree
Hide file tree
Showing 29 changed files with 1,416 additions and 422 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ class KryoSerializer(conf: SparkConf)

private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val registrator = conf.getOption("spark.kryo.registrator")

def newKryoOutput() = new KryoOutput(bufferSize)

def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()
kryo.setRegistrationRequired(registrationRequired)
val classLoader = Thread.currentThread.getContextClassLoader

// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
Expand Down Expand Up @@ -185,7 +187,8 @@ private[serializer] object KryoSerializer {
classOf[MapStatus],
classOf[BlockManagerId],
classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]]
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(
Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
FakeRackUtil.cleanUp()
}

test("test RACK_LOCAL tasks") {
Expand Down Expand Up @@ -505,6 +506,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2
// Task 1 can be scheduled with RACK_LOCAL
assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
FakeRackUtil.cleanUp()
}

test("do not emit warning when serialized task is small") {
Expand Down
19 changes: 15 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,17 @@ Apart from these, the following properties are also available, and may be useful
case.
</td>
</tr>
<tr>
<td><code>spark.kryo.registrationRequired</code></td>
<td>false</td>
<td>
Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception
if an unregistered class is serialized. If set to false (the default), Kryo will write
unregistered class names along with each object. Writing class names can cause
significant performance overhead, so enabling this option can enforce strictly that a
user has not omitted classes from registration.
</td>
</tr>
<tr>
<td><code>spark.kryoserializer.buffer.mb</code></td>
<td>2</td>
Expand Down Expand Up @@ -497,9 +508,9 @@ Apart from these, the following properties are also available, and may be useful
<tr>
<td>spark.hadoop.validateOutputSpecs</td>
<td>true</td>
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr>
</table>
Expand Down Expand Up @@ -861,7 +872,7 @@ Apart from these, the following properties are also available, and may be useful
</table>

#### Cluster Managers
Each cluster manager in Spark has additional configuration options. Configurations
Each cluster manager in Spark has additional configuration options. Configurations
can be found on the pages for each mode:

* [YARN](running-on-yarn.html#configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class GraphKryoRegistrator extends KryoRegistrator {

def registerClasses(kryo: Kryo) {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[RoutingTableMessage])
kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object, Object]])
kryo.register(classOf[BitSet])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,13 @@ import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
* the edge partition references `vid` in the specified `position` (src, dst, or both).
*/
private[graphx]
class RoutingTableMessage(
var vid: VertexId,
var pid: PartitionID,
var position: Byte)
extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
override def _1 = vid
override def _2 = (pid, position)
override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
}
import org.apache.spark.graphx.impl.RoutingTablePartition.RoutingTableMessage

private[graphx]
class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
/** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
new ShuffledRDD[VertexId, (PartitionID, Byte), (PartitionID, Byte), RoutingTableMessage](
new ShuffledRDD[VertexId, Int, Int, RoutingTableMessage](
self, partitioner).setSerializer(new RoutingTableMessageSerializer)
}
}
Expand All @@ -62,6 +49,23 @@ object RoutingTableMessageRDDFunctions {

private[graphx]
object RoutingTablePartition {
/**
* A message from an edge partition to a vertex specifying the position in which the edge
* partition references the vertex (src, dst, or both). The edge partition is encoded in the lower
* 30 bytes of the Int, and the position is encoded in the upper 2 bytes of the Int.
*/
type RoutingTableMessage = (VertexId, Int)

private def toMessage(vid: VertexId, pid: PartitionID, position: Byte): RoutingTableMessage = {
val positionUpper2 = position << 30
val pidLower30 = pid & 0x3FFFFFFF
(vid, positionUpper2 | pidLower30)
}

private def vidFromMessage(msg: RoutingTableMessage): VertexId = msg._1
private def pidFromMessage(msg: RoutingTableMessage): PartitionID = msg._2 & 0x3FFFFFFF
private def positionFromMessage(msg: RoutingTableMessage): Byte = (msg._2 >> 30).toByte

val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)

/** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
Expand All @@ -77,7 +81,9 @@ object RoutingTablePartition {
map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
}
map.iterator.map { vidAndPosition =>
new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
val vid = vidAndPosition._1
val position = vidAndPosition._2
toMessage(vid, pid, position)
}
}

Expand All @@ -88,9 +94,12 @@ object RoutingTablePartition {
val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
for (msg <- iter) {
pid2vid(msg.pid) += msg.vid
srcFlags(msg.pid) += (msg.position & 0x1) != 0
dstFlags(msg.pid) += (msg.position & 0x2) != 0
val vid = vidFromMessage(msg)
val pid = pidFromMessage(msg)
val position = positionFromMessage(msg)
pid2vid(pid) += vid
srcFlags(pid) += (position & 0x1) != 0
dstFlags(pid) += (position & 0x2) != 0
}

new RoutingTablePartition(pid2vid.zipWithIndex.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import java.nio.ByteBuffer

import scala.reflect.ClassTag

import org.apache.spark.graphx._
import org.apache.spark.serializer._

import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.RoutingTablePartition.RoutingTableMessage

private[graphx]
class RoutingTableMessageSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
Expand All @@ -35,10 +37,8 @@ class RoutingTableMessageSerializer extends Serializer with Serializable {
new ShuffleSerializationStream(s) {
def writeObject[T: ClassTag](t: T): SerializationStream = {
val msg = t.asInstanceOf[RoutingTableMessage]
writeVarLong(msg.vid, optimizePositive = false)
writeUnsignedVarInt(msg.pid)
// TODO: Write only the bottom two bits of msg.position
s.write(msg.position)
writeVarLong(msg._1, optimizePositive = false)
writeInt(msg._2)
this
}
}
Expand All @@ -47,10 +47,8 @@ class RoutingTableMessageSerializer extends Serializer with Serializable {
new ShuffleDeserializationStream(s) {
override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readUnsignedVarInt()
val c = s.read()
if (c == -1) throw new EOFException
new RoutingTableMessage(a, b, c.toByte).asInstanceOf[T]
val b = readInt()
(a, b).asInstanceOf[T]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ package object graphx {
*/
type VertexId = Long

/** Integer identifer of a graph partition. */
/** Integer identifer of a graph partition. Must be less than 2^30. */
// TODO: Consider using Char.
type PartitionID = Int

Expand Down
Loading

0 comments on commit 2a18d6b

Please sign in to comment.