Skip to content

Commit

Permalink
Merge branch 'master' into stratified
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jul 6, 2014
2 parents ee9d260 + 9d5ecf8 commit bbfb8c9
Show file tree
Hide file tree
Showing 39 changed files with 489 additions and 180 deletions.
8 changes: 4 additions & 4 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)
# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
echo "Loading Spark jar with '$JAR_CMD' failed. "
echo "This is likely because Spark was compiled with Java 7 and run "
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark "
echo "or build Spark with Java 6."
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
echo "or build Spark with Java 6." 1>&2
exit 1
fi

Expand Down
6 changes: 3 additions & 3 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export SPARK_HOME="$FWDIR"
SCALA_VERSION=2.10

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./bin/pyspark [options]"
echo "Usage: ./bin/pyspark [options]" 1>&2
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi
Expand All @@ -36,8 +36,8 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
# Exit if the user hasn't compiled Spark
ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
if [[ $? != 0 ]]; then
echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2
echo "You need to build Spark before running this program" >&2
echo "Failed to find Spark assembly in $FWDIR/assembly/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi
fi
Expand Down
10 changes: 5 additions & 5 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ if [ -n "$1" ]; then
EXAMPLE_CLASS="$1"
shift
else
echo "Usage: ./bin/run-example <example-class> [example-args]"
echo " - set MASTER=XX to use a specific master"
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)"
echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
echo " - set MASTER=XX to use a specific master" 1>&2
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2
exit 1
fi

Expand All @@ -40,8 +40,8 @@ elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.ja
fi

if [[ -z $SPARK_EXAMPLES_JAR ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2
echo "You need to build Spark before running this program" >&2
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi

Expand Down
13 changes: 6 additions & 7 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ export SPARK_HOME="$FWDIR"
. $FWDIR/bin/load-spark-env.sh

if [ -z "$1" ]; then
echo "Usage: spark-class <class> [<args>]" >&2
echo "Usage: spark-class <class> [<args>]" 1>&2
exit 1
fi

if [ -n "$SPARK_MEM" ]; then
echo "Warning: SPARK_MEM is deprecated, please use a more specific config option"
echo "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)."
echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2
echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2
fi

# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
Expand Down Expand Up @@ -147,10 +147,9 @@ fi
export CLASSPATH

if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
echo -n "Spark Command: "
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
echo "========================================"
echo
echo -n "Spark Command: " 1>&2
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
echo -e "========================================\n" 1>&2
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
} else {
// This happens on the master, where we pass the updates to Python through a socket
val socket = new Socket(serverHost, serverPort)
// SPARK-2282: Immediately reuse closed sockets because we create one per task.
socket.setReuseAddress(true)
val in = socket.getInputStream
val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
out.writeInt(val2.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.apache.spark.util.{SignalLogger, Utils}

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -169,10 +169,11 @@ class HistoryServer(
*
* This launches the HistoryServer as a Spark daemon.
*/
object HistoryServer {
object HistoryServer extends Logging {
private val conf = new SparkConf

def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
val args = new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ class HadoopRDD[K, V](
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
broadcastedConf.synchronized {
val newJobConf = new JobConf(broadcastedConf.value.value)
conf.synchronized {
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.util.BoundedPriorityQueue

import scala.reflect.ClassTag

Expand Down Expand Up @@ -183,7 +184,8 @@ private[serializer] object KryoSerializer {
classOf[GetBlock],
classOf[MapStatus],
classOf[BlockManagerId],
classOf[Array[Byte]]
classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]]
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,31 @@ import org.apache.spark.scheduler._
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()

def storageStatusList = executorIdToStorageStatus.values.toSeq

/** Update storage status list to reflect updated block statuses */
def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
val filteredStatus = executorIdToStorageStatus.get(execId)
filteredStatus.foreach { storageStatus =>
updatedBlocks.foreach { case (blockId, updatedStatus) =>
storageStatus.blocks(blockId) = updatedStatus
if (updatedStatus.storageLevel == StorageLevel.NONE) {
storageStatus.blocks.remove(blockId)
} else {
storageStatus.blocks(blockId) = updatedStatus
}
}
}
}

/** Update storage status list to reflect the removal of an RDD from the cache */
def updateStorageStatus(unpersistedRDDId: Int) {
private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
unpersistedBlocksIds.foreach { blockId =>
storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
storageStatus.blocks.remove(blockId)
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,26 @@ private[spark] object StorageUtils {
/** Returns storage information of all RDDs in the given list. */
def rddInfoFromStorageStatus(
storageStatuses: Seq[StorageStatus],
rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
rddInfos: Seq[RDDInfo],
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {

// Mapping from a block ID -> its status
val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)

// Record updated blocks, if any
updatedBlocks
.collect { case (id: RDDBlockId, status) => (id, status) }
.foreach { case (id, status) => blockMap(id) = status }

// Mapping from RDD ID -> an array of associated BlockStatuses
val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
val rddBlockMap = blockMap
.groupBy { case (k, _) => k.rddId }
.mapValues(_.values.toArray)

// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap

val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
// Add up memory, disk and Tachyon sizes
val persistedBlocks =
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.blocks.count { case (_, blockStatus) =>
blockStatus.storageLevel != StorageLevel.NONE
}
val rddBlocks = status.blocks.size
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener)
extends SparkListener {

class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener {
val executorToTasksActive = HashMap[String, Int]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ui._
import org.apache.spark.scheduler._
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
import org.apache.spark.storage._

/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") {
Expand All @@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
*/
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener)
extends SparkListener {

class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()

def storageStatusList = storageStatusListener.storageStatusList
Expand All @@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: StorageStatusListener)
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq

/** Update each RDD's info to reflect any updates to the RDD's storage status */
private def updateRDDInfo() {
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) {
val rddInfos = _rddInfoMap.values.toSeq
val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
val updatedRddInfos =
StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
}

Expand All @@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: StorageStatusListener)
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val metrics = taskEnd.taskMetrics
if (metrics != null && metrics.updatedBlocks.isDefined) {
updateRDDInfo()
updateRDDInfo(metrics.updatedBlocks.get)
}
}

Expand All @@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: StorageStatusListener)
}

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
updateRDDInfo()
_rddInfoMap.remove(unpersistRDD.rddId)
}
}
Loading

0 comments on commit bbfb8c9

Please sign in to comment.