Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 2, 2015
1 parent 7a3cc80 commit df3caa3
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
val cleanedF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, cleanedF, Range(0, self.partitions.length),
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
val cleanedCombOp = self.context.clean(combOp)
combineByKey[U](
(v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, cleanedCombOp, partitioner)
combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
}

/**
Expand Down

0 comments on commit df3caa3

Please sign in to comment.