diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 04ec3a406dd49..b4a5a5d9b9433 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -345,13 +345,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft executionOfInterruptibleCounter.getAndIncrement() } - val sem = new Semaphore(0) val taskCompletedSem = new Semaphore(0) - Future { - taskStartedSemaphore.acquire() - f.cancel() - sem.release() - } sc.addSparkListener(new SparkListener { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { @@ -368,8 +362,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } }) - // Make sure code in the Future block is finished, a.k.a tasks are being cancelled. - sem.acquire() + taskStartedSemaphore.acquire() + f.cancel() + val e = intercept[SparkException] { f.get() }.getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))