Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7989][Core][Tests] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite #6546

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.spark.ui.jobs

import java.util.concurrent.TimeoutException

import scala.collection.mutable.{HashMap, HashSet, ListBuffer}

import com.google.common.annotations.VisibleForTesting

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -526,4 +530,30 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
startTime = appStarted.time
}

/**
* For testing only. Wait until at least `numExecutors` executors are up, or throw
* `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
*
* @param numExecutors the number of executors to wait at least
* @param timeout time to wait in milliseconds
*/
@VisibleForTesting
private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
val finishTime = System.currentTimeMillis() + timeout
while (System.currentTimeMillis() < finishTime) {
val numBlockManagers = synchronized {
blockManagerIds.size
}
if (numBlockManagers >= numExecutors + 1) {
// Need to count the block manager in driver
return
}
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
// add overhead in the general case.
Thread.sleep(10)
}
throw new TimeoutException(
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])

// In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
// If we don't wait for all salves, it's possible that only one executor runs all jobs. Then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

salves

// all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
// local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
// In this case, we won't receive FetchFailed. And it will make this test fail.
// Therefore, we should wait until all salves are up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

salves again

sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)

val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)

rdd.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.broadcast

import scala.concurrent.duration._
import scala.util.Random

import org.scalatest.Assertions
import org.scalatest.concurrent.Eventually._

import org.apache.spark._
import org.apache.spark.io.SnappyCompressionCodec
Expand Down Expand Up @@ -312,13 +310,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val _sc =
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
eventually(timeout(10.seconds), interval(10.milliseconds)) {
_sc.jobProgressListener.synchronized {
val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
assert(numBlockManagers == numSlaves + 1,
s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
}
}
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
_sc
} else {
new SparkContext("local", "test", broadcastConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

import scala.collection.mutable
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.cluster.ExecutorInfo

/**
* Unit tests for SparkListener that require a local cluster.
Expand All @@ -41,6 +41,10 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

// This test will check if the number of executors received by "SparkListener" is same as the
// number of all executors, so we need to wait until all executors are up
sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
Expand Down