Skip to content

Commit

Permalink
Clean closures that are not currently cleaned
Browse files Browse the repository at this point in the history
Now the test added in the previous commit passes!
  • Loading branch information
Andrew Or committed Apr 29, 2015
1 parent 19e33b4 commit 9ac5f9b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 9 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
val cleanedF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, cleanedF, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
val cleanedSeqOp = self.context.clean(seqOp)
val cleanedCombOp = self.context.clean(combOp)
combineByKey[U](
(v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, cleanedCombOp, partitioner)
}

/**
Expand Down Expand Up @@ -177,7 +180,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
val cleanedFunc = self.context.clean(func)
combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
}

/**
Expand Down
17 changes: 12 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,11 @@ abstract class RDD[T: ClassTag](
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}

/**
Expand All @@ -650,8 +653,11 @@ abstract class RDD[T: ClassTag](
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}

/**
Expand Down Expand Up @@ -1334,7 +1340,8 @@ abstract class RDD[T: ClassTag](
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: T => K): RDD[(K, T)] = {
map(x => (f(x), x))
val cleanedF = sc.clean(f)
map(x => (cleanedF(x), x))
}

/** A private method for tests, to look at the contents of each partition */
Expand Down

0 comments on commit 9ac5f9b

Please sign in to comment.