Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify GraphImpl RDDs + other graph load optimizations #497

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ support the [Bagel API](api/scala/index.html#org.apache.spark.bagel.package) and
[Bagel programming guide](bagel-programming-guide.html). However, we encourage Bagel users to
explore the new GraphX API and comment on issues that may complicate the transition from Bagel.

## Upgrade Guide from Spark 0.9.1

GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change from Spark 0.9.1. [`EdgeRDD`][EdgeRDD] may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type `Graph[VD, ED]` are of type `EdgeRDD[ED, VD]` rather than `EdgeRDD[ED]`.

[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD

# Getting Started

To get started you first need to import Spark and GraphX into your project, as follows:
Expand Down Expand Up @@ -145,12 +151,12 @@ the vertices and edges of the graph:
{% highlight scala %}
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val edges: EdgeRDD[ED, VD]
}
{% endhighlight %}

The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional
The classes `VertexRDD[VD]` and `EdgeRDD[ED, VD]` extend and are optimized versions of `RDD[(VertexID,
VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED, VD]` provide additional
functionality built around graph computation and leverage internal optimizations. We discuss the
`VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
Expand Down Expand Up @@ -302,7 +308,7 @@ class Graph[VD, ED] {
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val edges: EdgeRDD[ED, VD]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
Expand Down Expand Up @@ -908,7 +914,7 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

## EdgeRDDs

The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
The `EdgeRDD[ED, VD]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy]. Within
each partition, edge attributes and adjacency structure, are stored separately enabling maximum
reuse when changing attribute values.
Expand All @@ -918,11 +924,11 @@ reuse when changing attribute values.
The three additional functions exposed by the `EdgeRDD` are:
{% highlight scala %}
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
def reverse: EdgeRDD[ED, VD]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
{% endhighlight %}

In most applications we have found that operations on the `EdgeRDD` are accomplished through the
Expand Down
56 changes: 35 additions & 21 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.EdgePartition

/**
* `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
* for performance.
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
* partition for performance. It may additionally store the vertex attributes associated with each
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

partitionsRDD.setName("EdgeRDD")
Expand All @@ -45,8 +48,12 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
p.next._2.iterator.map(_.copy())
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
Expand All @@ -61,11 +68,15 @@ class EdgeRDD[@specialized ED: ClassTag](
this
}

private[graphx] def mapEdgePartitions[ED2: ClassTag](
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

Expand All @@ -76,15 +87,22 @@ class EdgeRDD[@specialized ED: ClassTag](
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
mapEdgePartitions((pid, part) => part.map(f))

/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)

/** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}

/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
Expand All @@ -96,19 +114,15 @@ class EdgeRDD[@specialized ED: ClassTag](
* with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
(other: EdgeRDD[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}

private[graphx] def collectVertexIds(): RDD[VertexId] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }

override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()

def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
}
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
@transient val edges: EdgeRDD[ED]
@transient val edges: EdgeRDD[ED, VD]

/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.graphx

import com.esotericsoftware.kryo.Kryo

import org.apache.spark.graphx.impl._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.BitSet

import org.apache.spark.graphx.impl._

/**
* Registers GraphX classes with Kryo for improved performance.
Expand All @@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[RoutingTableMessage])
kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[EdgePartition[Object, Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
Expand Down
10 changes: 5 additions & 5 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ object GraphLoader extends Logging {
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the
* the edge RDD
* @param minEdgePartitions the number of partitions for the edge RDD
*/
def edgeListFile(
sc: SparkContext,
Expand All @@ -60,8 +59,9 @@ object GraphLoader extends Logging {
val startTime = System.currentTimeMillis

// Parse the edge data table directly into edge partitions
val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int]
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
Expand All @@ -78,7 +78,7 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
}.cache()
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
Expand Down
17 changes: 11 additions & 6 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.graphx

import scala.reflect.ClassTag
import org.apache.spark.SparkContext._
import scala.util.Random

import org.apache.spark.SparkException
import org.apache.spark.graphx.lib._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.util.Random

import org.apache.spark.graphx.lib._

/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
Expand All @@ -43,19 +45,22 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* The in-degree of each vertex in the graph.
* @note Vertices with no in-edges are not returned in the resulting RDD.
*/
@transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
@transient lazy val inDegrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")

/**
* The out-degree of each vertex in the graph.
* @note Vertices with no out-edges are not returned in the resulting RDD.
*/
@transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
@transient lazy val outDegrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")

/**
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
@transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
@transient lazy val degrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")

/**
* Computes the neighboring vertex degrees.
Expand Down
6 changes: 5 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.graphx

import scala.reflect.ClassTag
import org.apache.spark.Logging


/**
Expand Down Expand Up @@ -52,7 +53,7 @@ import scala.reflect.ClassTag
* }}}
*
*/
object Pregel {
object Pregel extends Logging {

/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
Expand Down Expand Up @@ -142,6 +143,9 @@ object Pregel {
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
activeMessages = messages.count()

logInfo("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
Expand Down
Loading