Skip to content

Commit

Permalink
Unify GraphImpl RDDs + other graph load optimizations
Browse files Browse the repository at this point in the history
This PR makes the following changes, primarily in e4fbd32:

1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices).

2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former.

3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view.

4. *Join elimination for mapTriplets.*

5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.

Author: Ankur Dave <[email protected]>

Closes apache#497 from ankurdave/unify-rdds and squashes the following commits:

332ab43 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
4933e2e [Ankur Dave] Exclude RoutingTable from binary compatibility check
5ba8789 [Ankur Dave] Add GraphX upgrade guide from Spark 0.9.1
13ac845 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds
a04765c [Ankur Dave] Remove unnecessary toOps call
57202e8 [Ankur Dave] Replace case with pair parameter
75af062 [Ankur Dave] Add explicit return types
04d3ae5 [Ankur Dave] Convert implicit parameter to context bound
c88b269 [Ankur Dave] Revert upgradeIterator to if-in-a-loop
0d3584c [Ankur Dave] EdgePartition.size should be val
2a928b2 [Ankur Dave] Set locality wait
10b3596 [Ankur Dave] Clean up public API
ae36110 [Ankur Dave] Fix style errors
e4fbd32 [Ankur Dave] Unify GraphImpl RDDs + other graph load optimizations
d6d60e2 [Ankur Dave] In GraphLoader, coalesce to minEdgePartitions
62c7b78 [Ankur Dave] In Analytics, take PageRank numIter
d64e8d4 [Ankur Dave] Log current Pregel iteration
  • Loading branch information
ankurdave authored and pdeyhim committed Jun 25, 2014
1 parent 762ba0b commit 39c0105
Show file tree
Hide file tree
Showing 28 changed files with 1,353 additions and 851 deletions.
22 changes: 14 additions & 8 deletions docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ support the [Bagel API](api/scala/index.html#org.apache.spark.bagel.package) and
[Bagel programming guide](bagel-programming-guide.html). However, we encourage Bagel users to
explore the new GraphX API and comment on issues that may complicate the transition from Bagel.

## Upgrade Guide from Spark 0.9.1

GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change from Spark 0.9.1. [`EdgeRDD`][EdgeRDD] may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type `Graph[VD, ED]` are of type `EdgeRDD[ED, VD]` rather than `EdgeRDD[ED]`.

[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD

# Getting Started

To get started you first need to import Spark and GraphX into your project, as follows:
Expand Down Expand Up @@ -145,12 +151,12 @@ the vertices and edges of the graph:
{% highlight scala %}
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val edges: EdgeRDD[ED, VD]
}
{% endhighlight %}

The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional
The classes `VertexRDD[VD]` and `EdgeRDD[ED, VD]` extend and are optimized versions of `RDD[(VertexID,
VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED, VD]` provide additional
functionality built around graph computation and leverage internal optimizations. We discuss the
`VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
Expand Down Expand Up @@ -302,7 +308,7 @@ class Graph[VD, ED] {
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val edges: EdgeRDD[ED, VD]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
Expand Down Expand Up @@ -908,7 +914,7 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

## EdgeRDDs

The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
The `EdgeRDD[ED, VD]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy]. Within
each partition, edge attributes and adjacency structure, are stored separately enabling maximum
reuse when changing attribute values.
Expand All @@ -918,11 +924,11 @@ reuse when changing attribute values.
The three additional functions exposed by the `EdgeRDD` are:
{% highlight scala %}
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
def reverse: EdgeRDD[ED, VD]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
{% endhighlight %}

In most applications we have found that operations on the `EdgeRDD` are accomplished through the
Expand Down
56 changes: 35 additions & 21 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.EdgePartition

/**
* `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
* for performance.
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
* partition for performance. It may additionally store the vertex attributes associated with each
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

partitionsRDD.setName("EdgeRDD")
Expand All @@ -45,8 +48,12 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
p.next._2.iterator.map(_.copy())
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
Expand All @@ -61,11 +68,15 @@ class EdgeRDD[@specialized ED: ClassTag](
this
}

private[graphx] def mapEdgePartitions[ED2: ClassTag](
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

Expand All @@ -76,15 +87,22 @@ class EdgeRDD[@specialized ED: ClassTag](
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
mapEdgePartitions((pid, part) => part.map(f))

/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)

/** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}

/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
Expand All @@ -96,19 +114,15 @@ class EdgeRDD[@specialized ED: ClassTag](
* with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
(other: EdgeRDD[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}

private[graphx] def collectVertexIds(): RDD[VertexId] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }

override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()

def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
}
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
@transient val edges: EdgeRDD[ED]
@transient val edges: EdgeRDD[ED, VD]

/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.graphx

import com.esotericsoftware.kryo.Kryo

import org.apache.spark.graphx.impl._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx.impl._

/**
* Registers GraphX classes with Kryo for improved performance.
Expand All @@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[RoutingTableMessage])
kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[EdgePartition[Object, Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
Expand Down
10 changes: 5 additions & 5 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ object GraphLoader extends Logging {
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the
* the edge RDD
* @param minEdgePartitions the number of partitions for the edge RDD
*/
def edgeListFile(
sc: SparkContext,
Expand All @@ -60,8 +59,9 @@ object GraphLoader extends Logging {
val startTime = System.currentTimeMillis

// Parse the edge data table directly into edge partitions
val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int]
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
Expand All @@ -78,7 +78,7 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
}.cache()
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
Expand Down
17 changes: 11 additions & 6 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.graphx

import scala.reflect.ClassTag
import org.apache.spark.SparkContext._
import scala.util.Random

import org.apache.spark.SparkException
import org.apache.spark.graphx.lib._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.util.Random

import org.apache.spark.graphx.lib._

/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
Expand All @@ -43,19 +45,22 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* The in-degree of each vertex in the graph.
* @note Vertices with no in-edges are not returned in the resulting RDD.
*/
@transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
@transient lazy val inDegrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")

/**
* The out-degree of each vertex in the graph.
* @note Vertices with no out-edges are not returned in the resulting RDD.
*/
@transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
@transient lazy val outDegrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")

/**
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
@transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
@transient lazy val degrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")

/**
* Computes the neighboring vertex degrees.
Expand Down
6 changes: 5 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.graphx

import scala.reflect.ClassTag
import org.apache.spark.Logging


/**
Expand Down Expand Up @@ -52,7 +53,7 @@ import scala.reflect.ClassTag
* }}}
*
*/
object Pregel {
object Pregel extends Logging {

/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
Expand Down Expand Up @@ -142,6 +143,9 @@ object Pregel {
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
activeMessages = messages.count()

logInfo("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
Expand Down
Loading

0 comments on commit 39c0105

Please sign in to comment.