Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
[DCOS-51158] Improved Task ID assignment for Executor tasks (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
akirillov authored and Alex Lembiewski committed Jun 12, 2019
1 parent f4d7757 commit d91f193
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -611,14 +611,12 @@ private[spark] class MesosClusterScheduler(
partitionResources(remainingResources.asJava, "mem", desc.mem)
offer.remainingResources = finalResources.asJava

val appName = desc.conf.get("spark.app.name")

val driverLabels = MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS)
.getOrElse(""))

TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setName(s"Driver for ${desc.name}")
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.setContainer(getContainerInfo(desc))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

private val metricsSource = new MesosCoarseGrainedSchedulerSource(this)

private val schedulerUuid: String = UUID.randomUUID().toString

private var nextMesosTaskId = 0

@volatile var appId: String = _

private var schedulerDriver: SchedulerDriver = _

def newMesosTaskId(): String = {
val id = nextMesosTaskId
nextMesosTaskId += 1
id.toString
}
private val schedulerUuid: String = UUID.randomUUID().toString
private val nextExecutorNumber = new AtomicLong()

override def start() {
super.start()
Expand Down Expand Up @@ -528,7 +521,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
if (canLaunchTask(slaveId, offer.getHostname, resources)) {
// Create a task
launchTasks = true
val taskId = newMesosTaskId()
val taskSeqNumber = nextExecutorNumber.getAndIncrement()
val taskId = s"${schedulerUuid}-$taskSeqNumber"
val offerCPUs = getResource(resources, "cpus").toInt
val offerGPUs = getResource(resources, "gpus").toInt
var taskGPUs = executorGpus(offerGPUs)
Expand All @@ -541,10 +535,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)

val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue( s"$schedulerUuid-$taskId").build())
.setTaskId(TaskID.newBuilder().setValue(taskId).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName(s"${sc.appName} $taskId")
.setName(s"${sc.appName} $taskSeqNumber")
.setLabels(MesosProtoUtils.mesosLabels(taskLabels))
.addAllResources(resourcesToUse.asJava)
.setContainer(getContainerInfo(sc.conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ trait MesosSchedulerUtils extends Logging {
* the same frameworkID. To enforce that only the first driver registers with the configured
* framework ID, the driver calls this method after the first registration.
*/
@deprecated("Multiple Spark Contexts and fine-grained scheduler are deprecated")
def unsetFrameworkID(sc: SparkContext) {
sc.conf.remove("spark.mesos.driver.frameworkId")
System.clearProperty("spark.mesos.driver.frameworkId")
Expand Down

0 comments on commit d91f193

Please sign in to comment.