Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into topic/streaming-b…
Browse files Browse the repository at this point in the history
…p/rate-controller
  • Loading branch information
dragos committed Jul 28, 2015
2 parents 475e346 + 15724fa commit a2eb3b9
Show file tree
Hide file tree
Showing 279 changed files with 7,910 additions and 5,468 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1566,7 +1566,7 @@ setMethod("fillna",
#' @return a local R data.frame representing the contingency table. The first column of each row
#' will be the distinct values of `col1` and the column names will be the distinct values
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
#' occurrences will have `null` as their counts.
#' occurrences will have zero as their counts.
#'
#' @rdname statfunctions
#' @export
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ readList <- function(con) {

readRaw <- function(con) {
dataLen <- readInt(con)
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
readBin(con, raw(), as.integer(dataLen), endian = "big")
}

readRawLen <- function(con, dataLen) {
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
readBin(con, raw(), as.integer(dataLen), endian = "big")
}

readDeserialize <- function(con) {
Expand Down
3 changes: 0 additions & 3 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,13 @@ sparkR.init <- function(
return(get(".sparkRjsc", envir = .sparkREnv))
}

sparkMem <- Sys.getenv("SPARK_MEM", "1024m")
jars <- suppressWarnings(normalizePath(as.character(sparkJars)))

# Classpath separator is ";" on Windows
# URI needs four /// as from http://stackoverflow.com/a/18522792
if (.Platform$OS.type == "unix") {
collapseChar <- ":"
uriSep <- "//"
} else {
collapseChar <- ";"
uriSep <- "////"
}

Expand Down
6 changes: 3 additions & 3 deletions R/pkg/inst/tests/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ test_that("glm and predict", {

test_that("predictions match with native glm", {
training <- createDataFrame(sqlContext, iris)
model <- glm(Sepal_Width ~ Sepal_Length, data = training)
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-9), rVals - vals)
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ fi

export PYSPARK_DRIVER_PYTHON
export PYSPARK_DRIVER_PYTHON_OPTS
exec "$SPARK_HOME"/bin/spark-submit pyspark-shell-main "$@"
exec "$SPARK_HOME"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py

call %SPARK_HOME%\bin\spark-submit2.cmd pyspark-shell-main %*
call %SPARK_HOME%\bin\spark-submit2.cmd pyspark-shell-main --name "PySparkShell" %*
4 changes: 4 additions & 0 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
}

@VisibleForTesting
public int getNumberOfAllocatedPages() {
return allocatedPages.size();
}

public long freeMemory() {
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
Expand Down Expand Up @@ -257,7 +262,7 @@ public void insertRecord(
currentPagePosition,
lengthInBytes);
currentPagePosition += lengthInBytes;

freeSpaceInCurrentPage -= totalSpaceRequired;
sorter.insertRecord(recordAddress, prefix);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,23 @@ package org.apache.spark
private[spark] trait ExecutorAllocationClient {

/**
* Express a preference to the cluster manager for a given total number of executors.
* This can result in canceling pending requests or filing additional requests.
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* @param numExecutors The total number of executors we'd like to have. The cluster manager
* shouldn't kill any running executor to reach this number, but,
* if all existing executors were to die, this is the number of executors
* we'd want to be allocated.
* @param localityAwareTasks The number of tasks in all active stages that have a locality
* preferences. This includes running, pending, and completed tasks.
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
* that would like to like to run on that host.
* This includes running, pending, and completed tasks.
* @return whether the request is acknowledged by the cluster manager.
*/
private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
private[spark] def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean

/**
* Request an additional number of executors from the cluster manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ private[spark] class ExecutorAllocationManager(
// (2) an executor idle timeout has elapsed.
@volatile private var initializing: Boolean = true

// Number of locality aware tasks, used for executor placement.
private var localityAwareTasks = 0

// Host to possible task running on it, used for executor placement.
private var hostToLocalTaskCount: Map[String, Int] = Map.empty

/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
Expand Down Expand Up @@ -295,7 +301,7 @@ private[spark] class ExecutorAllocationManager(

// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
client.requestTotalExecutors(numExecutorsTarget)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
}
Expand Down Expand Up @@ -349,7 +355,8 @@ private[spark] class ExecutorAllocationManager(
return 0
}

val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
val addRequestAcknowledged = testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
if (addRequestAcknowledged) {
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
Expand Down Expand Up @@ -519,13 +526,37 @@ private[spark] class ExecutorAllocationManager(
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
private var numRunningTasks: Int = _

// stageId to tuple (the number of task with locality preferences, a map where each pair is a
// node and the number of tasks that would like to be scheduled on that node) map,
// maintain the executor placement hints for each stage Id used by resource framework to better
// place the executors.
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
initializing = false
val stageId = stageSubmitted.stageInfo.stageId
val numTasks = stageSubmitted.stageInfo.numTasks
allocationManager.synchronized {
stageIdToNumTasks(stageId) = numTasks
allocationManager.onSchedulerBacklogged()

// Compute the number of tasks requested by the stage on each host
var numTasksPending = 0
val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()
stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
if (!locality.isEmpty) {
numTasksPending += 1
locality.foreach { location =>
val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
hostToLocalTaskCountPerStage(location.host) = count
}
}
}
stageIdToExecutorPlacementHints.put(stageId,
(numTasksPending, hostToLocalTaskCountPerStage.toMap))

// Update the executor placement hints
updateExecutorPlacementHints()
}
}

Expand All @@ -534,6 +565,10 @@ private[spark] class ExecutorAllocationManager(
allocationManager.synchronized {
stageIdToNumTasks -= stageId
stageIdToTaskIndices -= stageId
stageIdToExecutorPlacementHints -= stageId

// Update the executor placement hints
updateExecutorPlacementHints()

// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
Expand Down Expand Up @@ -637,6 +672,29 @@ private[spark] class ExecutorAllocationManager(
def isExecutorIdle(executorId: String): Boolean = {
!executorIdToTaskIds.contains(executorId)
}

/**
* Update the Executor placement hints (the number of tasks with locality preferences,
* a map where each pair is a node and the number of tasks that would like to be scheduled
* on that node).
*
* These hints are updated when stages arrive and complete, so are not up-to-date at task
* granularity within stages.
*/
def updateExecutorPlacementHints(): Unit = {
var localityAwareTasks = 0
val localityToCount = new mutable.HashMap[String, Int]()
stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
localityAwareTasks += numTasksPending
localities.foreach { case (hostname, count) =>
val updatedCount = localityToCount.getOrElse(hostname, 0) + count
localityToCount(hostname) = updatedCount
}
}

allocationManager.localityAwareTasks = localityAwareTasks
allocationManager.hostToLocalTaskCount = localityToCount.toMap
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object Partitioner {
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1382,16 +1382,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/**
* Express a preference to the cluster manager for a given total number of executors.
* This can result in canceling pending requests or filing additional requests.
* This is currently only supported in YARN mode. Return whether the request is received.
*/
private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* @param numExecutors The total number of executors we'd like to have. The cluster manager
* shouldn't kill any running executor to reach this number, but,
* if all existing executors were to die, this is the number of executors
* we'd want to be allocated.
* @param localityAwareTasks The number of tasks in all active stages that have a locality
* preferences. This includes running, pending, and completed tasks.
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
* that would like to like to run on that host.
* This includes running, pending, and completed tasks.
* @return whether the request is acknowledged by the cluster manager.
*/
private[spark] override def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
): Boolean = {
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors)
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
case _ =>
logWarning("Requesting executors is only supported in coarse-grained mode")
false
Expand Down
Loading

0 comments on commit a2eb3b9

Please sign in to comment.