Skip to content

Commit

Permalink
[SPARK-34541][CORE] Fixed an issue where data could not be cleaned up…
Browse files Browse the repository at this point in the history
… when unregisterShuffle

### What changes were proposed in this pull request?
Fixed an issue where data could not be cleaned up when unregisterShuffle.

### Why are the changes needed?
While we use the old shuffle fetch protocol, we use partitionId as mapId in the ShuffleBlockId construction,but we use `context.taskAttemptId()` as mapId that it is cached in `taskIdMapsForShuffle` when we `getWriter[K, V]`.

where data could not be cleaned up when unregisterShuffle ,because we remove a shuffle's metadata from the `taskIdMapsForShuffle`'s mapIds, the mapId is `context.taskAttemptId()` instead of partitionId.

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
add new test.

Closes #31664 from yikf/master.

Authored-by: yikf <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
yikf authored and srowen committed Mar 8, 2021
1 parent 02e74b2 commit f340857
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
mapTaskIds.synchronized { mapTaskIds.add(mapId) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
Expand Down
32 changes: 31 additions & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.spark

import java.io.File
import java.util.{Locale, Properties}
import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService}

import scala.collection.JavaConverters._

import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.TrueFileFilter
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._

Expand All @@ -29,7 +34,7 @@ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
import org.apache.spark.util.MutablePair
Expand Down Expand Up @@ -419,6 +424,31 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

manager.unregisterShuffle(0)
}

test("SPARK-34541: shuffle can be removed") {
withTempDir { tmpDir =>
def getAllFiles: Set[File] =
FileUtils.listFiles(tmpDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
conf.set("spark.local.dir", tmpDir.getAbsolutePath)
sc = new SparkContext("local", "test", conf)
// For making the taskAttemptId starts from 1.
sc.parallelize(1 to 10).count()
val rdd = sc.parallelize(1 to 10, 1).map(x => (x, x))
// Create a shuffleRdd
val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new HashPartitioner(4))
.setSerializer(new JavaSerializer(conf))
val filesBeforeShuffle = getAllFiles
// Force the shuffle to be performed
shuffledRdd.count()
// Ensure that the shuffle actually created files that will need to be cleaned up
val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle
// Check that the cleanup actually removes the files
sc.env.blockManager.master.removeShuffle(0, blocking = true)
for (file <- filesCreatedByShuffle) {
assert (!file.exists(), s"Shuffle file $file was not cleaned up")
}
}
}
}

/**
Expand Down

0 comments on commit f340857

Please sign in to comment.