Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1786: Edge Partition Serialization #724

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx.impl._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
import org.apache.spark.util.collection.OpenHashSet


/**
* Registers GraphX classes with Kryo for improved performance.
Expand All @@ -43,8 +46,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[PartitionStrategy])
kryo.register(classOf[BoundedPriorityQueue[Object]])
kryo.register(classOf[EdgeDirection])

// This avoids a large number of hash table lookups.
kryo.setReferences(false)
kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
kryo.register(classOf[OpenHashSet[Int]])
kryo.register(classOf[OpenHashSet[Long]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
Expand All @@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
@transient val srcIds: Array[VertexId],
@transient val dstIds: Array[VertexId],
@transient val data: Array[ED],
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
@transient val vertices: VertexPartition[VD],
@transient val activeSet: Option[VertexSet] = None
val srcIds: Array[VertexId] = null,
val dstIds: Array[VertexId] = null,
val data: Array[ED] = null,
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
val vertices: VertexPartition[VD] = null,
val activeSet: Option[VertexSet] = None
) extends Serializable {

/** Return a new `EdgePartition` with the specified edge data. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.Sorting
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
Expand All @@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
val srcIds = new Array[VertexId](edgeArray.size)
val dstIds = new Array[VertexId](edgeArray.size)
val data = new Array[ED](edgeArray.size)
val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
Expand Down Expand Up @@ -69,7 +69,7 @@ object RoutingTablePartition {
: Iterator[RoutingTableMessage] = {
// Determine which positions each vertex id appears in using a map where the low 2 bits
// represent src and dst
val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
edgePartition.srcIds.iterator.foreach { srcId =>
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/** Stores vertex attributes to ship to an edge partition. */
private[graphx]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx] object VertexPartition {
/** Construct a `VertexPartition` from the given vertices. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

private[graphx] object VertexPartitionBase {
/**
Expand All @@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
: (VertexIdToIndexMap, Array[VD], BitSet) = {
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map(pair._1) = pair._2
}
Expand All @@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
: (VertexIdToIndexMap, Array[VD], BitSet) = {
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.Logging
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

/**
* An class containing additional operations for subclasses of VertexPartitionBase that provide
Expand Down Expand Up @@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): Self[VD] = {
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- self.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.reflect._
* Under the hood, it uses our OpenHashSet implementation.
*/
private[graphx]
class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
@specialized(Long, Int, Double) V: ClassTag](
val keySet: OpenHashSet[K], var _values: Array[V])
extends Iterable[(K, V)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import scala.util.Random

import org.scalatest.FunSuite

import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer

import org.apache.spark.graphx._

class EdgePartitionSuite extends FunSuite {
Expand Down Expand Up @@ -120,4 +123,19 @@ class EdgePartitionSuite extends FunSuite {
assert(!ep.isActive(-1))
assert(ep.numActives == Some(2))
}

test("Kryo serialization") {
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
val s = new KryoSerializer(conf).newInstance()
val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
assert(aSer.srcIds.toList === a.srcIds.toList)
assert(aSer.dstIds.toList === a.dstIds.toList)
assert(aSer.data.toList === a.data.toList)
assert(aSer.index != null)
assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
}
}