Skip to content

Commit

Permalink
Fix the resource request decrease scenario and add a test for it.
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Mar 16, 2022
1 parent b01149c commit 603ba97
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (h._1 > c) {
// Prepend updated first req to times, constant time op
((h._1 - c, h._2)) +=: times
c = 0
} else {
c = c - h._1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,118 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
}
}

test("exec alloc decrease.") {

val testStartTime = System.currentTimeMillis()

val execCores = 3
val conf = new SparkConf()
.set(EXECUTOR_CORES, execCores)
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
.set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations
.setMaster(
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
.setAppName("test")
conf.set(TASK_GPU_ID.amountConf, "1")
conf.set(EXECUTOR_GPU_ID.amountConf, "1")

sc = new SparkContext(conf)
val execGpu = new ExecutorResourceRequests().cores(1).resource(GPU, 3)
val taskGpu = new TaskResourceRequests().cpus(1).resource(GPU, 1)
val rp = new ResourceProfile(execGpu.requests, taskGpu.requests)
sc.resourceProfileManager.addResourceProfile(rp)
assert(rp.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
// Note we get two in default profile and one in the new rp
// we need to put a req time in for all of them.
backend.requestTotalExecutors(Map((rp.id, 1)), Map(), Map())
// Decrease the number of execs requested in the new rp.
backend.requestTotalExecutors(Map((rp.id, 0)), Map(), Map())
// Request execs in the default profile.
backend.requestExecutors(3)
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => {})

val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "3")))

var executorAddedCount: Int = 0
val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
val listener = new SparkListener() {
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
// Lets check that the exec allocation times "make sense"
val info = executorAdded.executorInfo
infos += info
executorAddedCount += 1
}
}

sc.addSparkListener(listener)

backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
rp.id))

val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
val buffer = new SerializableBuffer(bytebuffer)

var execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))

val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
assert(exec3ResourceProfileId === rp.id)

val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long],
mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), 1, taskResources, bytebuffer)))
val ts = backend.getTaskSchedulerImpl()
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs)

backend.driverEndpoint.send(ReviveOffers)

eventually(timeout(5 seconds)) {
execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("1", "3"))
assert(execResources(GPU).assignedAddrs === Array("0"))
}

// To avoid allocating any resources immediately after releasing the resource from the task to
// make sure that `availableAddrs` below won't change
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(Seq.empty)
backend.driverEndpoint.send(
StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources))

eventually(timeout(5 seconds)) {
execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
assert(execResources(GPU).assignedAddrs.isEmpty)
}
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
assert(executorAddedCount === 3)
infos.foreach { info =>
info.requestTime.map { t =>
assert(t > 0,
"Exec request times don't make sense")
assert(t >= testStartTime,
"Exec allocation and request times don't make sense")
assert(t >= info.requestTime.get,
"Exec allocation and request times don't make sense")
}
}
assert(infos.filter(_.requestTime.isEmpty).length === 1,
"Our unexpected executor does not have a request time.")
}


private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
sc.submitJob(
rdd,
Expand Down

0 comments on commit 603ba97

Please sign in to comment.