Skip to content

Commit

Permalink
Convert implicit parameter to context bound
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed May 7, 2014
1 parent c88b269 commit 04d3ae5
Showing 1 changed file with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.graphx.impl

import scala.language.higherKinds
import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark.Logging
Expand All @@ -31,14 +32,14 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
* implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for
* example, [[VertexPartition.VertexPartitionOpsConstructor]]).
*/
private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: VertexPartitionBase[X]]
(self: T[VD])
(implicit ev: VertexPartitionBaseOpsConstructor[T])
extends Logging {
private[graphx] abstract class VertexPartitionBaseOps
[VD: ClassTag, Self[X] <: VertexPartitionBase[X] : VertexPartitionBaseOpsConstructor]
(self: Self[VD])
extends Logging {

def withIndex(index: VertexIdToIndexMap): T[VD]
def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2]
def withMask(mask: BitSet): T[VD]
def withIndex(index: VertexIdToIndexMap): Self[VD]
def withValues[VD2: ClassTag](values: Array[VD2]): Self[VD2]
def withMask(mask: BitSet): Self[VD]

/**
* Pass each vertex attribute along with the vertex id through a map
Expand All @@ -53,7 +54,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
* each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
def map[VD2: ClassTag](f: (VertexId, VD) => VD2): T[VD2] = {
def map[VD2: ClassTag](f: (VertexId, VD) => VD2): Self[VD2] = {
// Construct a view of the map transformation
val newValues = new Array[VD2](self.capacity)
var i = self.mask.nextSetBit(0)
Expand All @@ -73,7 +74,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
def filter(pred: (VertexId, VD) => Boolean): T[VD] = {
def filter(pred: (VertexId, VD) => Boolean): Self[VD] = {
// Allocate the array to store the results into
val newMask = new BitSet(self.capacity)
// Iterate over the active bits in the old mask and evaluate the predicate
Expand All @@ -91,7 +92,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
* Hides vertices that are the same between this and other. For vertices that are different, keeps
* the values from `other`. The indices of `this` and `other` must be the same.
*/
def diff(other: T[VD]): T[VD] = {
def diff(other: Self[VD]): Self[VD] = {
if (self.index != other.index) {
logWarning("Diffing two VertexPartitions with different indexes is slow.")
diff(createUsingIndex(other.iterator))
Expand All @@ -104,14 +105,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
}
i = newMask.nextSetBit(i + 1)
}
ev.toOps(this.withValues(other.values)).withMask(newMask)
toOps(this.withValues(other.values)).withMask(newMask)
}
}

/** Left outer join another VertexPartition. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: T[VD2])
(f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = {
(other: Self[VD2])
(f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
if (self.index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
leftJoin(createUsingIndex(other.iterator))(f)
Expand All @@ -131,14 +132,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
/** Left outer join another iterator of messages. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: Iterator[(VertexId, VD2)])
(f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = {
(f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
leftJoin(createUsingIndex(other))(f)
}

/** Inner join another VertexPartition. */
def innerJoin[U: ClassTag, VD2: ClassTag]
(other: T[U])
(f: (VertexId, VD, U) => VD2): T[VD2] = {
(other: Self[U])
(f: (VertexId, VD, U) => VD2): Self[VD2] = {
if (self.index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
innerJoin(createUsingIndex(other.iterator))(f)
Expand All @@ -150,7 +151,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i))
i = newMask.nextSetBit(i + 1)
}
ev.toOps(this.withValues(newValues)).withMask(newMask)
this.withValues(newValues).withMask(newMask)
}
}

Expand All @@ -159,15 +160,15 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
*/
def innerJoin[U: ClassTag, VD2: ClassTag]
(iter: Iterator[Product2[VertexId, U]])
(f: (VertexId, VD, U) => VD2): T[VD2] = {
(f: (VertexId, VD, U) => VD2): Self[VD2] = {
innerJoin(createUsingIndex(iter))(f)
}

/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
: T[VD2] = {
: Self[VD2] = {
val newMask = new BitSet(self.capacity)
val newValues = new Array[VD2](self.capacity)
iter.foreach { case (vid, vdata) =>
Expand All @@ -177,14 +178,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
newValues(pos) = vdata
}
}
ev.toOps(this.withValues(newValues)).withMask(newMask)
this.withValues(newValues).withMask(newMask)
}

/**
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask.
*/
def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): T[VD] = {
def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): Self[VD] = {
val newMask = new BitSet(self.capacity)
val newValues = new Array[VD](self.capacity)
System.arraycopy(self.values, 0, newValues, 0, newValues.length)
Expand All @@ -195,12 +196,12 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
newValues(pos) = vdata
}
}
ev.toOps(this.withValues(newValues)).withMask(newMask)
this.withValues(newValues).withMask(newMask)
}

def aggregateUsingIndex[VD2: ClassTag](
iter: Iterator[Product2[VertexId, VD2]],
reduceFunc: (VD2, VD2) => VD2): T[VD2] = {
reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
val newMask = new BitSet(self.capacity)
val newValues = new Array[VD2](self.capacity)
iter.foreach { product =>
Expand All @@ -216,22 +217,29 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
}
}
}
ev.toOps(this.withValues(newValues)).withMask(newMask)
this.withValues(newValues).withMask(newMask)
}

/**
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): T[VD] = {
def reindex(): Self[VD] = {
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- self.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
}
ev.toOps(
ev.toOps(
this.withIndex(hashMap.keySet))
.withValues(hashMap._values))
.withMask(hashMap.keySet.getBitSet)
this.withIndex(hashMap.keySet).withValues(hashMap._values).withMask(hashMap.keySet.getBitSet)
}

/**
* Converts a vertex partition (in particular, one of type `Self`) into a
* `VertexPartitionBaseOps`. Within this class, this allows chaining the methods defined above,
* because these methods return a `Self` and this implicit conversion re-wraps that in a
* `VertexPartitionBaseOps`. This relies on the context bound on `Self`.
*/
private implicit def toOps[VD2: ClassTag](
partition: Self[VD2]): VertexPartitionBaseOps[VD2, Self] = {
implicitly[VertexPartitionBaseOpsConstructor[Self]].toOps(partition)
}
}

0 comments on commit 04d3ae5

Please sign in to comment.