Skip to content

Commit

Permalink
Unify GraphImpl RDDs + other graph load optimizations
Browse files Browse the repository at this point in the history
This commit makes the following changes:

1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs:
vertices, edges, routing table, and triplet view. This commit merges
them down to two: vertices (with routing table), and edges (with
replicated vertices).

2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles
when building a graph: one to extract routing information from the edges
and move it to the vertices, and another to find nonexistent vertices
referred to by edges. With this commit, the latter is done as a side
effect of the former.

3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side
effect of unifying the edges and the triplet view.

4. *Join elimination for mapTriplets.*

5. *Ship only the needed vertex attributes when upgrading the
triplet view.* If the triplet view already contains source attributes,
and we now need both attributes, only ship destination attributes rather
than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.
  • Loading branch information
ankurdave committed Apr 23, 2014
1 parent d6d60e2 commit e4fbd32
Show file tree
Hide file tree
Showing 24 changed files with 1,299 additions and 841 deletions.
63 changes: 38 additions & 25 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.EdgePartition

/**
* `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
* for performance.
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
* partition for performance. It may additionally store the vertex attributes associated with each
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
* [[org.apache.spark.graphx.impl.ReplicatedVertexView]].
*/
class EdgeRDD[@specialized ED: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

partitionsRDD.setName("EdgeRDD")
Expand All @@ -45,33 +48,41 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
p.next._2.iterator.map(_.copy())
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
override def persist(newLevel: StorageLevel): EdgeRDD[ED, VD] = {
partitionsRDD.persist(newLevel)
this
}

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
override def persist(): EdgeRDD[ED, VD] = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): EdgeRDD[ED] = persist()
override def cache(): EdgeRDD[ED, VD] = persist()

override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
override def unpersist(blocking: Boolean = true): EdgeRDD[ED, VD] = {
partitionsRDD.unpersist(blocking)
this
}

private[graphx] def mapEdgePartitions[ED2: ClassTag](
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

Expand All @@ -82,15 +93,21 @@ class EdgeRDD[@specialized ED: ClassTag](
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
mapEdgePartitions((pid, part) => part.map(f))

/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)

def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}

/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
Expand All @@ -102,19 +119,15 @@ class EdgeRDD[@specialized ED: ClassTag](
* with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
(other: EdgeRDD[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}

private[graphx] def collectVertexIds(): RDD[VertexId] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }

override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()

def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr)
}
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
val edges: EdgeRDD[ED]
val edges: EdgeRDD[ED, VD]

/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.graphx

import com.esotericsoftware.kryo.Kryo

import org.apache.spark.graphx.impl._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx.impl._

/**
* Registers GraphX classes with Kryo for improved performance.
Expand All @@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[RoutingTableMessage])
kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[EdgePartition[Object, Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
Expand Down
10 changes: 5 additions & 5 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ object GraphLoader extends Logging {
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the
* the edge RDD
* @param minEdgePartitions the number of partitions for the edge RDD
*/
def edgeListFile(
sc: SparkContext,
Expand All @@ -60,8 +59,9 @@ object GraphLoader extends Logging {
val startTime = System.currentTimeMillis

// Parse the edge data table directly into edge partitions
val edges = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int]
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
Expand All @@ -78,7 +78,7 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
}.cache()
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
Expand Down
14 changes: 8 additions & 6 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.graphx

import scala.reflect.ClassTag
import org.apache.spark.SparkContext._
import scala.util.Random

import org.apache.spark.SparkException
import org.apache.spark.graphx.lib._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.util.Random

import org.apache.spark.graphx.lib._

/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
Expand All @@ -43,19 +45,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* The in-degree of each vertex in the graph.
* @note Vertices with no in-edges are not returned in the resulting RDD.
*/
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")

/**
* The out-degree of each vertex in the graph.
* @note Vertices with no out-edges are not returned in the resulting RDD.
*/
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")

/**
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")

/**
* Computes the neighboring vertex degrees.
Expand Down
Loading

0 comments on commit e4fbd32

Please sign in to comment.