Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into sql-external-sort
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 8, 2015
2 parents 293f109 + 08192a1 commit 844f4ca
Show file tree
Hide file tree
Showing 145 changed files with 2,010 additions and 1,091 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ private[deploy] object JsonProtocol {
}

def writeMasterState(obj: MasterStateResponse): JObject = {
val aliveWorkers = obj.workers.filter(_.isAlive())
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
("coresused" -> obj.workers.map(_.coresUsed).sum) ~
("memory" -> obj.workers.map(_.memory).sum) ~
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("cores" -> aliveWorkers.map(_.cores).sum) ~
("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~
("memory" -> aliveWorkers.map(_.memory).sum) ~
("memoryused" -> aliveWorkers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,6 @@ private[spark] class WorkerInfo(
def setState(state: WorkerState.Value): Unit = {
this.state = state
}

def isAlive(): Boolean = this.state == WorkerState.ALIVE
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{Collections, List => JList}
import java.util.{List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -66,6 +66,10 @@ private[spark] class CoarseMesosSchedulerBackend(

val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)

// Offer constraints
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))

var nextMesosTaskId = 0

@volatile var appId: String = _
Expand Down Expand Up @@ -170,13 +174,16 @@ private[spark] class CoarseMesosSchedulerBackend(
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()

for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
if (totalCoresAcquired < maxCores &&
mem >= MemoryUtils.calculateTotalMemory(sc) &&
val id = offer.getId.getValue
if (meetsConstraints &&
totalCoresAcquired < maxCores &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
Expand All @@ -193,33 +200,25 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem",
MemoryUtils.calculateTotalMemory(sc)))
.addResources(createResource("mem", calculateTotalMemory(sc)))

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
}

d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.launchTasks(List(offer.getId), List(task.build()), filters)
} else {
// Filter it out
d.launchTasks(
Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.declineOffer(offer.getId)
}
}
}
}

/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
.setName(resourceName)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
.build()
}

override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}

import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.{SparkContext, SparkException, TaskState}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkException, TaskState}

/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
Expand Down Expand Up @@ -59,6 +59,10 @@ private[spark] class MesosSchedulerBackend(

private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)

// Offer constraints
private[this] val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))

@volatile var appId: String = _

override def start() {
Expand All @@ -71,8 +75,8 @@ private[spark] class MesosSchedulerBackend(
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val environment = Environment.newBuilder()
sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
environment.addVariables(
Expand Down Expand Up @@ -115,14 +119,14 @@ private[spark] class MesosSchedulerBackend(
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(mesosExecutorCores).build())
.setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.setValue(calculateTotalMemory(sc)).build())
.build()
val executorInfo = MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
Expand Down Expand Up @@ -191,13 +195,31 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
val offerAttributes = toAttributeMap(o.getAttributesList)

// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements - need at least 1 for executor, 1 for task
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)

val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)

// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
val id = o.getId.getValue
logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")

meetsRequirements
}

// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))

val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
Expand All @@ -223,15 +245,15 @@ private[spark] class MesosSchedulerBackend(
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
acceptedOffers
.foreach { offer =>
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
}
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
}
}

// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
Expand All @@ -251,8 +273,6 @@ private[spark] class MesosSchedulerBackend(
d.declineOffer(o.getId)
}

// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))
}
}

Expand Down
Loading

0 comments on commit 844f4ca

Please sign in to comment.