Skip to content

Commit

Permalink
Remove unused VertexBroadcastMsg
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Jul 23, 2014
1 parent 5f7b991 commit ab71364
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class GraphKryoRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[RoutingTableMessage])
kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object, Object]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,6 @@ import org.apache.spark.graphx.{PartitionID, VertexId}
import org.apache.spark.rdd.{ShuffledRDD, RDD}


private[graphx]
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
@transient var partition: PartitionID,
var vid: VertexId,
var data: T)
extends Product2[PartitionID, (VertexId, T)] with Serializable {

override def _1 = partition

override def _2 = (vid, data)

override def canEqual(that: Any): Boolean = that.isInstanceOf[VertexBroadcastMsg[_]]
}


/**
* A message used to send a specific value to a partition.
* @param partition index of the target partition.
Expand All @@ -59,25 +44,6 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/* , AnyRe
}


private[graphx]
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
val rdd = new ShuffledRDD[PartitionID, (VertexId, T), (VertexId, T), VertexBroadcastMsg[T]](
self, partitioner)

// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
rdd.setSerializer(new IntVertexBroadcastMsgSerializer)
} else if (classTag[T] == ClassTag.Long) {
rdd.setSerializer(new LongVertexBroadcastMsgSerializer)
} else if (classTag[T] == ClassTag.Double) {
rdd.setSerializer(new DoubleVertexBroadcastMsgSerializer)
}
rdd
}
}


private[graphx]
class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {

Expand All @@ -95,10 +61,6 @@ object MsgRDDFunctions {
implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = {
new MsgRDDFunctions(rdd)
}

implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = {
new VertexBroadcastMsgRDDFunctions(rdd)
}
}

private[graphx]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,78 +76,6 @@ class VertexIdMsgSerializer extends Serializer with Serializable {
}
}

/** A special shuffle serializer for VertexBroadcastMessage[Int]. */
private[graphx]
class IntVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {

override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Int]]
writeVarLong(msg.vid, optimizePositive = false)
writeInt(msg.data)
this
}
}

override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readInt()
new VertexBroadcastMsg[Int](0, a, b).asInstanceOf[T]
}
}
}
}

/** A special shuffle serializer for VertexBroadcastMessage[Long]. */
private[graphx]
class LongVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {

override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Long]]
writeVarLong(msg.vid, optimizePositive = false)
writeLong(msg.data)
this
}
}

override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readLong()
new VertexBroadcastMsg[Long](0, a, b).asInstanceOf[T]
}
}
}
}

/** A special shuffle serializer for VertexBroadcastMessage[Double]. */
private[graphx]
class DoubleVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {

override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Double]]
writeVarLong(msg.vid, optimizePositive = false)
writeDouble(msg.data)
this
}
}

override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readDouble()
new VertexBroadcastMsg[Double](0, a, b).asInstanceOf[T]
}
}
}
}

/** A special shuffle serializer for AggregationMessage[Int]. */
private[graphx]
class IntAggMsgSerializer extends Serializer with Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,69 +32,6 @@ import org.apache.spark.serializer.SerializationStream

class SerializerSuite extends FunSuite with LocalSparkContext {

test("IntVertexBroadcastMsgSerializer") {
val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
val bout = new ByteArrayOutputStream
val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)

intercept[EOFException] {
inStrm.readObject()
}
}

test("LongVertexBroadcastMsgSerializer") {
val outMsg = new VertexBroadcastMsg[Long](3, 4, 5)
val bout = new ByteArrayOutputStream
val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)

intercept[EOFException] {
inStrm.readObject()
}
}

test("DoubleVertexBroadcastMsgSerializer") {
val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)

intercept[EOFException] {
inStrm.readObject()
}
}

test("IntAggMsgSerializer") {
val outMsg = (4: VertexId, 5)
val bout = new ByteArrayOutputStream
Expand Down Expand Up @@ -152,15 +89,6 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
}

test("TestShuffleVertexBroadcastMsg") {
withSpark { sc =>
val bmsgs = sc.parallelize(0 until 100, 10).map { pid =>
new VertexBroadcastMsg[Int](pid, pid, pid)
}
bmsgs.partitionBy(new HashPartitioner(3)).collect()
}
}

test("variable long encoding") {
def testVarLongEncoding(v: Long, optimizePositive: Boolean) {
val bout = new ByteArrayOutputStream
Expand Down

0 comments on commit ab71364

Please sign in to comment.