Skip to content

Commit

Permalink
Making EdgePartition serializable.
Browse files Browse the repository at this point in the history
  • Loading branch information
jegonzal committed May 10, 2014
1 parent 905173d commit 67dac22
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.util.collection.OpenHashSet


/**
* Registers GraphX classes with Kryo for improved performance.
Expand All @@ -43,7 +46,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[PartitionStrategy])
kryo.register(classOf[BoundedPriorityQueue[Object]])
kryo.register(classOf[EdgeDirection])

kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
kryo.register(classOf[OpenHashSet[Int]])
kryo.register(classOf[OpenHashSet[Long]])
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
Expand All @@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
@transient val srcIds: Array[VertexId],
@transient val dstIds: Array[VertexId],
@transient val data: Array[ED],
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
@transient val vertices: VertexPartition[VD],
@transient val activeSet: Option[VertexSet] = None
val srcIds: Array[VertexId] = null,
val dstIds: Array[VertexId] = null,
val data: Array[ED] = null,
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
val vertices: VertexPartition[VD] = null,
val activeSet: Option[VertexSet] = None
) extends Serializable {

/** Return a new `EdgePartition` with the specified edge data. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.Sorting
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
Expand All @@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
val srcIds = new Array[VertexId](edgeArray.size)
val dstIds = new Array[VertexId](edgeArray.size)
val data = new Array[ED](edgeArray.size)
val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
Expand Down Expand Up @@ -69,7 +69,7 @@ object RoutingTablePartition {
: Iterator[RoutingTableMessage] = {
// Determine which positions each vertex id appears in using a map where the low 2 bits
// represent src and dst
val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
edgePartition.srcIds.iterator.foreach { srcId =>
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/** Stores vertex attributes to ship to an edge partition. */
private[graphx]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx] object VertexPartition {
/** Construct a `VertexPartition` from the given vertices. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx] object VertexPartitionBase {
/**
Expand All @@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
: (VertexIdToIndexMap, Array[VD], BitSet) = {
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map(pair._1) = pair._2
}
Expand All @@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
: (VertexIdToIndexMap, Array[VD], BitSet) = {
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.Logging
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* An class containing additional operations for subclasses of VertexPartitionBase that provide
Expand Down Expand Up @@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): Self[VD] = {
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- self.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.reflect._
* Under the hood, it uses our OpenHashSet implementation.
*/
private[graphx]
class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
@specialized(Long, Int, Double) V: ClassTag](
val keySet: OpenHashSet[K], var _values: Array[V])
extends Iterable[(K, V)]
Expand Down

0 comments on commit 67dac22

Please sign in to comment.