Skip to content

Commit

Permalink
Added a unit test for clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jul 18, 2014
1 parent 754085f commit c3b6f11
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}

Expand Down Expand Up @@ -1212,7 +1213,7 @@ abstract class RDD[T: ClassTag](
* task gets a different copy of the RDD. This provides stronger isolation between tasks that
* might modify state of objects referenced in their closures.
*/
@transient private[spark] lazy val broadcasted = {
@transient private[spark] lazy val broadcasted: Broadcast[Array[Byte]] = {
val ser = SparkEnv.get.closureSerializer.newInstance()
sc.broadcast(ser.serialize(this).array())
}
Expand Down
62 changes: 37 additions & 25 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
}


test("cleanup RDD") {
val rdd = newRDD.persist()
val rdd = newRDD().persist()
val collected = rdd.collect().toList
val tester = new CleanerTester(sc, rddIds = Seq(rdd.id))

Expand All @@ -67,7 +66,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}

test("cleanup shuffle") {
val (rdd, shuffleDeps) = newRDDWithShuffleDependencies
val (rdd, shuffleDeps) = newRDDWithShuffleDependencies()
val collected = rdd.collect().toList
val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))

Expand All @@ -80,7 +79,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}

test("cleanup broadcast") {
val broadcast = newBroadcast
val broadcast = newBroadcast()
val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))

// Explicit cleanup
Expand All @@ -89,7 +88,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}

test("automatically cleanup RDD") {
var rdd = newRDD.persist()
var rdd = newRDD().persist()
rdd.count()

// Test that GC does not cause RDD cleanup due to a strong reference
Expand All @@ -107,7 +106,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}

test("automatically cleanup shuffle") {
var rdd = newShuffleRDD
var rdd = newShuffleRDD()
rdd.count()

// Test that GC does not cause shuffle cleanup due to a strong reference
Expand All @@ -125,7 +124,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}

test("automatically cleanup broadcast") {
var broadcast = newBroadcast
var broadcast = newBroadcast()

// Test that GC does not cause broadcast cleanup due to a strong reference
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
Expand All @@ -141,11 +140,23 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
postGCTester.assertCleanup()
}

test("automatically cleanup broadcast data for task dispatching") {
var rdd = newRDDWithShuffleDependencies()._1
rdd.count() // This triggers an action that broadcasts the RDDs.

// Test that GC causes broadcast task data cleanup after dereferencing the RDD.
val postGCTester = new CleanerTester(sc,
broadcastIds = Seq(rdd.broadcasted.id, rdd.firstParent.broadcasted.id))
rdd = null
runGC()
postGCTester.assertCleanup()
}

test("automatically cleanup RDD + shuffle + broadcast") {
val numRdds = 100
val numBroadcasts = 4 // Broadcasts are more costly
val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts
Expand Down Expand Up @@ -175,8 +186,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo

val numRdds = 10
val numBroadcasts = 4 // Broadcasts are more costly
val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts
Expand All @@ -197,17 +208,18 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo

//------ Helper functions ------

def newRDD = sc.makeRDD(1 to 10)
def newPairRDD = newRDD.map(_ -> 1)
def newShuffleRDD = newPairRDD.reduceByKey(_ + _)
def newBroadcast = sc.broadcast(1 to 100)
def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
private def newRDD() = sc.makeRDD(1 to 10)
private def newPairRDD() = newRDD().map(_ -> 1)
private def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
private def newBroadcast() = sc.broadcast(1 to 100)

private def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
getAllDependencies(dep.rdd)
}
}
val rdd = newShuffleRDD
val rdd = newShuffleRDD()

// Get all the shuffle dependencies
val shuffleDeps = getAllDependencies(rdd)
Expand All @@ -216,34 +228,34 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
(rdd, shuffleDeps)
}

def randomRdd = {
private def randomRdd() = {
val rdd: RDD[_] = Random.nextInt(3) match {
case 0 => newRDD
case 1 => newShuffleRDD
case 2 => newPairRDD.join(newPairRDD)
case 0 => newRDD()
case 1 => newShuffleRDD()
case 2 => newPairRDD.join(newPairRDD())
}
if (Random.nextBoolean()) rdd.persist()
rdd.count()
rdd
}

def randomBroadcast = {
private def randomBroadcast() = {
sc.broadcast(Random.nextInt(Int.MaxValue))
}

/** Run GC and make sure it actually has run */
def runGC() {
private def runGC() {
val weakRef = new WeakReference(new Object())
val startTime = System.currentTimeMillis
System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
// Wait until a weak reference object has been GCed
while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
while (System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
System.gc()
Thread.sleep(200)
}
}

def cleaner = sc.cleaner.get
private def cleaner = sc.cleaner.get
}


Expand Down

0 comments on commit c3b6f11

Please sign in to comment.