From 75af062291666577a8a3b9c7c5549c98f5ed9fef Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 6 May 2014 22:22:59 -0700 Subject: [PATCH] Add explicit return types --- .../org/apache/spark/graphx/EdgeTriplet.scala | 2 +- .../spark/graphx/impl/Serializers.scala | 36 ++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala index d9b19f88f5ecf..9d473d5ebda44 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala @@ -64,5 +64,5 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() - def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr) + def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala index 8992d3770801a..4eb4349ed68b2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala @@ -29,26 +29,28 @@ private[graphx] class RoutingTableMessageSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { - override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { - def writeObject[T](t: T) = { - val msg = t.asInstanceOf[RoutingTableMessage] - writeVarLong(msg.vid, optimizePositive = false) - writeUnsignedVarInt(msg.pid) - // TODO: Write only the bottom two bits of msg.position - s.write(msg.position) - this + override def serializeStream(s: OutputStream): SerializationStream = + new ShuffleSerializationStream(s) { + def writeObject[T](t: T): SerializationStream = { + val msg = t.asInstanceOf[RoutingTableMessage] + writeVarLong(msg.vid, optimizePositive = false) + writeUnsignedVarInt(msg.pid) + // TODO: Write only the bottom two bits of msg.position + s.write(msg.position) + this + } } - } - override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { - override def readObject[T](): T = { - val a = readVarLong(optimizePositive = false) - val b = readUnsignedVarInt() - val c = s.read() - if (c == -1) throw new EOFException - new RoutingTableMessage(a, b, c.toByte).asInstanceOf[T] + override def deserializeStream(s: InputStream): DeserializationStream = + new ShuffleDeserializationStream(s) { + override def readObject[T](): T = { + val a = readVarLong(optimizePositive = false) + val b = readUnsignedVarInt() + val c = s.read() + if (c == -1) throw new EOFException + new RoutingTableMessage(a, b, c.toByte).asInstanceOf[T] + } } - } } }