diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 2cf4e381c1c88..e81e8c060cb98 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -81,10 +81,10 @@ ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null) # Verify that versions of java used to build the jars and run Spark are compatible jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1) if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then - echo "Loading Spark jar with '$JAR_CMD' failed. " - echo "This is likely because Spark was compiled with Java 7 and run " - echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " - echo "or build Spark with Java 6." + echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2 + echo "This is likely because Spark was compiled with Java 7 and run " 1>&2 + echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2 + echo "or build Spark with Java 6." 1>&2 exit 1 fi diff --git a/bin/pyspark b/bin/pyspark index 0b5ed40e2157d..69b056fe28f2c 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -26,7 +26,7 @@ export SPARK_HOME="$FWDIR" SCALA_VERSION=2.10 if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./bin/pyspark [options]" + echo "Usage: ./bin/pyspark [options]" 1>&2 $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 0 fi @@ -36,8 +36,8 @@ if [ ! -f "$FWDIR/RELEASE" ]; then # Exit if the user hasn't compiled Spark ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null if [[ $? != 0 ]]; then - echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2 - echo "You need to build Spark before running this program" >&2 + echo "Failed to find Spark assembly in $FWDIR/assembly/target" 1>&2 + echo "You need to build Spark before running this program" 1>&2 exit 1 fi fi diff --git a/bin/run-example b/bin/run-example index e7a5fe3914fbd..942706d733122 100755 --- a/bin/run-example +++ b/bin/run-example @@ -27,9 +27,9 @@ if [ -n "$1" ]; then EXAMPLE_CLASS="$1" shift else - echo "Usage: ./bin/run-example [example-args]" - echo " - set MASTER=XX to use a specific master" - echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" + echo "Usage: ./bin/run-example [example-args]" 1>&2 + echo " - set MASTER=XX to use a specific master" 1>&2 + echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2 exit 1 fi @@ -40,8 +40,8 @@ elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.ja fi if [[ -z $SPARK_EXAMPLES_JAR ]]; then - echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2 - echo "You need to build Spark before running this program" >&2 + echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2 + echo "You need to build Spark before running this program" 1>&2 exit 1 fi diff --git a/bin/spark-class b/bin/spark-class index 60d9657c0ffcd..04fa52c6756b1 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -33,13 +33,13 @@ export SPARK_HOME="$FWDIR" . $FWDIR/bin/load-spark-env.sh if [ -z "$1" ]; then - echo "Usage: spark-class []" >&2 + echo "Usage: spark-class []" 1>&2 exit 1 fi if [ -n "$SPARK_MEM" ]; then - echo "Warning: SPARK_MEM is deprecated, please use a more specific config option" - echo "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." + echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2 + echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2 fi # Use SPARK_MEM or 512m as the default memory, to be overridden by specific options @@ -147,10 +147,9 @@ fi export CLASSPATH if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then - echo -n "Spark Command: " - echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" - echo "========================================" - echo + echo -n "Spark Command: " 1>&2 + echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 + echo -e "========================================\n" 1>&2 fi exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" diff --git a/core/pom.xml b/core/pom.xml index 8c23842730e37..6abf8480d5da0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,6 +34,12 @@ org.apache.hadoop hadoop-client + + + javax.servlet + servlet-api + + net.java.dev.jets3t diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index a8bc141208a94..11fd956bfbe66 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -95,6 +95,10 @@ span.expand-details { float: right; } +pre { + font-size: 0.8em; +} + .stage-details { max-height: 100px; overflow-y: auto; diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index f30a28b2cfeed..ee25458f2ad48 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -20,6 +20,7 @@ package org.apache.spark import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark.executor.InputMetrics import org.apache.spark.rdd.RDD import org.apache.spark.storage._ import org.apache.spark.util.collection.SizeTrackingAppendOnlyBuffer @@ -50,9 +51,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val key = RDDBlockId(rdd.id, partition.index) logDebug(s"Looking for partition $key") blockManager.get(key) match { - case Some(values) => + case Some(blockResult) => // Partition is already materialized, so just return its values - new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) + context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) + new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => // Acquire a lock for loading this partition @@ -119,7 +121,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logInfo(s"Whoever was loading $id failed; we'll try it ourselves") loading.add(id) } - values.map(_.asInstanceOf[Iterator[T]]) + values.map(_.data.asInstanceOf[Iterator[T]]) } } } @@ -141,7 +143,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * exceptions that can be avoided. */ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true) blockManager.get(key) match { - case Some(v) => v.asInstanceOf[Iterator[T]] + case Some(v) => v.data.asInstanceOf[Iterator[T]] case None => logInfo(s"Failure to store $key") throw new BlockException(key, s"Block manager failed to return cached value for $key!") diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 182abacc475ae..894091761485d 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -26,10 +26,10 @@ import scala.concurrent.Await import akka.actor._ import akka.pattern.ask -import org.apache.spark.util._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util._ private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int) @@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging Await.result(future, timeout) } catch { case e: Exception => + logError("Error communicating with MapOutputTracker", e) throw new SparkException("Error communicating with MapOutputTracker", e) } } /** Send a one-way message to the trackerActor, to which we expect it to reply with true. */ protected def sendTracker(message: Any) { - if (askTracker(message) != true) { - throw new SparkException("Error reply received from MapOutputTracker") + val response = askTracker(message) + if (response != true) { + throw new SparkException( + "Error reply received from MapOutputTracker. Expecting true, got " + response.toString) } } @@ -366,9 +369,9 @@ private[spark] object MapOutputTracker { // any of the statuses is null (indicating a missing location due to a failed mapper), // throw a FetchFailedException. private def convertMapStatuses( - shuffleId: Int, - reduceId: Int, - statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { + shuffleId: Int, + reduceId: Int, + statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { assert (statuses != null) statuses.map { status => @@ -403,7 +406,7 @@ private[spark] object MapOutputTracker { if (compressedSize == 0) { 0 } else { - math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong + math.pow(LOG_BASE, compressedSize & 0xFF).toLong } } } diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index e7f75481939a8..ec99648a8488a 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -17,11 +17,13 @@ package org.apache.spark +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + import scala.reflect.ClassTag import org.apache.spark.rdd.RDD -import org.apache.spark.util.CollectionsUtils -import org.apache.spark.util.Utils +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{CollectionsUtils, Utils} /** * An object that defines how the elements in a key-value pair RDD are partitioned by key. @@ -96,15 +98,15 @@ class HashPartitioner(partitions: Int) extends Partitioner { * the value of `partitions`. */ class RangePartitioner[K : Ordering : ClassTag, V]( - partitions: Int, + @transient partitions: Int, @transient rdd: RDD[_ <: Product2[K,V]], - private val ascending: Boolean = true) + private var ascending: Boolean = true) extends Partitioner { - private val ordering = implicitly[Ordering[K]] + private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions - private val rangeBounds: Array[K] = { + private var rangeBounds: Array[K] = { if (partitions == 1) { Array() } else { @@ -127,7 +129,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( def numPartitions = rangeBounds.length + 1 - private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] + private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] @@ -173,4 +175,40 @@ class RangePartitioner[K : Ordering : ClassTag, V]( result = prime * result + ascending.hashCode result } + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream) { + val sfactory = SparkEnv.get.serializer + sfactory match { + case js: JavaSerializer => out.defaultWriteObject() + case _ => + out.writeBoolean(ascending) + out.writeObject(ordering) + out.writeObject(binarySearch) + + val ser = sfactory.newInstance() + Utils.serializeViaNestedStream(out, ser) { stream => + stream.writeObject(scala.reflect.classTag[Array[K]]) + stream.writeObject(rangeBounds) + } + } + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream) { + val sfactory = SparkEnv.get.serializer + sfactory match { + case js: JavaSerializer => in.defaultReadObject() + case _ => + ascending = in.readBoolean() + ordering = in.readObject().asInstanceOf[Ordering[K]] + binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int] + + val ser = sfactory.newInstance() + Utils.deserializeViaNestedStream(in, ser) { ds => + implicit val classTag = ds.readObject[ClassTag[Array[K]]]() + rangeBounds = ds.readObject[Array[K]]() + } + } + } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f9476ff826a62..8819e73d17fb2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1203,9 +1203,17 @@ class SparkContext(config: SparkConf) extends Logging { /** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) + * If checkSerializable is set, clean will also proactively + * check to see if f is serializable and throw a SparkException + * if not. + * + * @param f the closure to clean + * @param checkSerializable whether or not to immediately check f for serializability + * @throws SparkException if checkSerializable is set but f is not + * serializable */ - private[spark] def clean[F <: AnyRef](f: F): F = { - ClosureCleaner.clean(f) + private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) f } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 0d6bce064525b..eb97f1adbbd86 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -187,6 +187,7 @@ object SparkEnv extends Logging { val serializer = instantiateClass[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer") + logDebug(s"Using serializer: ${serializer.getClass}") val closureSerializer = instantiateClass[Serializer]( "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer") diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 4351ed74b67fc..2ebd7a7151a59 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -22,3 +22,11 @@ class SparkException(message: String, cause: Throwable) def this(message: String) = this(message, null) } + +/** + * Exception thrown when execution of some user code in the driver process fails, e.g. + * accumulator update fails or failure in takeOrdered (user supplies an Ordering implementation + * that can be misbehaving. + */ +private[spark] class SparkDriverExecutionException(cause: Throwable) + extends SparkException("Execution error", cause) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7e9a9344e61f9..b050dccb6d57f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy import java.io.{File, PrintStream} import java.lang.reflect.InvocationTargetException -import java.net.{URI, URL} +import java.net.URL import scala.collection.mutable.{ArrayBuffer, HashMap, Map} @@ -117,14 +117,25 @@ object SparkSubmit { val isPython = args.isPython val isYarnCluster = clusterManager == YARN && deployOnCluster + // For mesos, only client mode is supported if (clusterManager == MESOS && deployOnCluster) { - printErrorAndExit("Cannot currently run driver on the cluster in Mesos") + printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") + } + + // For standalone, only client mode is supported + if (clusterManager == STANDALONE && deployOnCluster) { + printErrorAndExit("Cluster deploy mode is currently not supported for standalone clusters.") + } + + // For shells, only client mode is applicable + if (isShell(args.primaryResource) && deployOnCluster) { + printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") } // If we're running a python app, set the main class to our specific python runner if (isPython) { if (deployOnCluster) { - printErrorAndExit("Cannot currently run Python driver programs on cluster") + printErrorAndExit("Cluster deploy mode is currently not supported for python.") } if (args.primaryResource == PYSPARK_SHELL) { args.mainClass = "py4j.GatewayServer" diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1032ea8dbada..57655aa4c32b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -338,8 +338,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { """Usage: spark-submit [options] [app options] |Options: | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. - | --deploy-mode DEPLOY_MODE Where to run the driver program: either "client" to run - | on the local machine, or "cluster" to run inside cluster. + | --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or + | on one of the worker machines inside the cluster ("cluster") + | (Default: client). | --class CLASS_NAME Your application's main class (for Java / Scala apps). | --name NAME A name of your application. | --jars JARS Comma-separated list of local jars to include on the driver diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 33ffcbd216954..11545b8203707 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -41,7 +41,7 @@ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} private[spark] class Master( host: String, @@ -755,12 +755,13 @@ private[spark] class Master( } } -private[spark] object Master { +private[spark] object Master extends Logging { val systemName = "sparkMaster" private val actorName = "Master" val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r def main(argStrings: Array[String]) { + SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index a0ecaf709f8e2..ce425443051b0 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} /** * @param masterUrls Each url should look like spark://host:port. @@ -365,8 +365,9 @@ private[spark] class Worker( } } -private[spark] object Worker { +private[spark] object Worker extends Logging { def main(argStrings: Array[String]) { + SignalLogger.register(log) val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b5fd334f40203..8d31bd05fdbec 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, @@ -97,10 +97,12 @@ private[spark] class CoarseGrainedExecutorBackend( } } -private[spark] object CoarseGrainedExecutorBackend { +private[spark] object CoarseGrainedExecutorBackend extends Logging { def run(driverUrl: String, executorId: String, hostname: String, cores: Int, workerUrl: Option[String]) { + SignalLogger.register(log) + SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 74100498bb2bd..2232e6237bf26 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -25,8 +25,8 @@ import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} import org.apache.spark.{Logging, TaskState} import org.apache.spark.TaskState.TaskState -import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.util.{SignalLogger, Utils} private[spark] class MesosExecutorBackend extends MesosExecutor @@ -93,8 +93,9 @@ private[spark] class MesosExecutorBackend /** * Entry point for Mesos executor. */ -private[spark] object MesosExecutorBackend { +private[spark] object MesosExecutorBackend extends Logging { def main(args: Array[String]) { + SignalLogger.register(log) SparkHadoopUtil.get.runAsSparkUser { () => MesosNativeLibrary.load() // Create a new Executor and start it running diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 350fd74173f65..ac73288442a74 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -66,6 +66,12 @@ class TaskMetrics extends Serializable { */ var diskBytesSpilled: Long = _ + /** + * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read + * are stored here. + */ + var inputMetrics: Option[InputMetrics] = None + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ @@ -87,6 +93,29 @@ private[spark] object TaskMetrics { def empty: TaskMetrics = new TaskMetrics } +/** + * :: DeveloperApi :: + * Method by which input data was read. Network means that the data was read over the network + * from a remote block manager (which may have stored the data on-disk or in-memory). + */ +@DeveloperApi +object DataReadMethod extends Enumeration with Serializable { + type DataReadMethod = Value + val Memory, Disk, Hadoop, Network = Value +} + +/** + * :: DeveloperApi :: + * Metrics about reading input data. + */ +@DeveloperApi +case class InputMetrics(readMethod: DataReadMethod.Value) { + /** + * Total bytes read. + */ + var bytesRead: Long = 0L +} + /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index c64da8804d166..2673ec22509e9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds val blockManager = SparkEnv.get.blockManager val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { - case Some(block) => block.asInstanceOf[Iterator[T]] + case Some(block) => block.data.asInstanceOf[Iterator[T]] case None => throw new Exception("Could not compute split, block " + blockId + " not found") } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2aa111d600e9b..98dcbf4e2dbfa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -38,6 +38,7 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.util.NextIterator /** @@ -196,6 +197,20 @@ class HadoopRDD[K, V]( context.addOnCompleteCallback{ () => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue() + + // Set the task input metrics. + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + try { + /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't + * always at record boundaries, so tasks may need to read into other splits to complete + * a record. */ + inputMetrics.bytesRead = split.inputSplit.value.getLength() + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + context.taskMetrics.inputMetrics = Some(inputMetrics) + override def getNext() = { try { finished = !reader.next(key, value) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index ac1ccc06f238a..f2b3a64bf1345 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -31,6 +31,7 @@ import org.apache.spark.Logging import org.apache.spark.Partition import org.apache.spark.SerializableWritable import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.executor.{DataReadMethod, InputMetrics} private[spark] class NewHadoopPartition( rddId: Int, @@ -112,6 +113,18 @@ class NewHadoopRDD[K, V]( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + try { + /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't + * always at record boundaries, so tasks may need to read into other splits to complete + * a record. */ + inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength() + } catch { + case e: Exception => + logWarning("Unable to get input split size in order to set task input bytes", e) + } + context.taskMetrics.inputMetrics = Some(inputMetrics) + // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) var havePair = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c8559a7a82868..81c136d970312 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -581,8 +581,9 @@ class DAGScheduler( } } catch { case e: Exception => - jobResult = JobFailed(e) - job.listener.jobFailed(e) + val exception = new SparkDriverExecutionException(e) + jobResult = JobFailed(exception) + job.listener.jobFailed(exception) case oom: OutOfMemoryError => val exception = new SparkException("Local job aborted due to out of memory error", oom) jobResult = JobFailed(exception) @@ -622,16 +623,6 @@ class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { - for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) { - if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 && - !stageInfo.emittedTaskSizeWarning) { - stageInfo.emittedTaskSizeWarning = true - logWarning(("Stage %d (%s) contains a task of very large " + - "size (%d KB). The maximum recommended task size is %d KB.").format( - task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, - DAGScheduler.TASK_SIZE_TO_WARN)) - } - } listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) submitWaitingStages() } @@ -822,6 +813,7 @@ class DAGScheduler( case Success => logInfo("Completed " + task) if (event.accumUpdates != null) { + // TODO: fail the stage if the accumulator update fails... Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted } pendingTasks(stage) -= task @@ -838,7 +830,16 @@ class DAGScheduler( cleanupStateForJobAndIndependentStages(job, Some(stage)) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } - job.listener.taskSucceeded(rt.outputId, event.result) + + // taskSucceeded runs some user code that might throw an exception. Make sure + // we are resilient against that. + try { + job.listener.taskSucceeded(rt.outputId, event.result) + } catch { + case e: Exception => + // TODO: Perhaps we want to mark the stage as failed? + job.listener.jobFailed(new SparkDriverExecutionException(e)) + } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") @@ -1161,8 +1162,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) override val supervisorStrategy = OneForOneStrategy() { case x: Exception => - logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" - .format(x.getMessage)) + logError("eventProcesserActor failed; shutting down SparkContext", x) try { dagScheduler.doCancelAllJobs() } catch { @@ -1244,7 +1244,4 @@ private[spark] object DAGScheduler { // The time, in millis, to wake up between polls of the completion queue in order to potentially // resubmit failed stages val POLL_TIMEOUT = 10L - - // Warns the user if a stage contains a task with size greater than this value (in KB) - val TASK_SIZE_TO_WARN = 100 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index a1e21cad48b9b..47dd112f68325 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{DataReadMethod, TaskMetrics} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** * :: DeveloperApi :: @@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime - val readMetrics = taskMetrics.shuffleReadMetrics match { + val inputMetrics = taskMetrics.inputMetrics match { + case Some(metrics) => + " READ_METHOD=" + metrics.readMethod.toString + + " INPUT_BYTES=" + metrics.bytesRead + case None => "" + } + val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match { case Some(metrics) => " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + @@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten case None => "" } - stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics) + stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics + + writeMetrics) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 7644e3f351b3c..480891550eb60 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -38,8 +38,6 @@ class StageInfo( /** If the stage failed, the reason why. */ var failureReason: Option[String] = None - var emittedTaskSizeWarning = false - def stageFailed(reason: String) { failureReason = Some(reason) completionTime = Some(System.currentTimeMillis) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 6aecdfe8e6656..29de0453ac19a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -49,8 +49,6 @@ class TaskInfo( var failed = false - var serializedSize: Int = 0 - private[spark] def markGettingResult(time: Long = System.currentTimeMillis) { gettingResultTime = time } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 83ff6b8550b4f..059cc9085a2e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -166,6 +166,8 @@ private[spark] class TaskSetManager( override def schedulingMode = SchedulingMode.NONE + var emittedTaskSizeWarning = false + /** * Add a task to all the pending-task lists that it should be on. If readding is set, we are * re-adding the task so only include it in each list if it's not already there. @@ -418,6 +420,13 @@ private[spark] class TaskSetManager( // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && + !emittedTaskSizeWarning) { + emittedTaskSizeWarning = true + logWarning(s"Stage ${task.stageId} contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } val timeTaken = clock.getTime() - startTime addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( @@ -764,3 +773,9 @@ private[spark] class TaskSetManager( localityWaits = myLocalityLevels.map(getLocalityWait) } } + +private[spark] object TaskSetManager { + // The user will be warned if any stages contain a task that has a serialized size greater than + // this. + val TASK_SIZE_TO_WARN_KB = 100 +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 002f03ff622e4..09fc793a357a8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props} import sun.nio.ch.DirectBuffer import org.apache.spark._ +import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer @@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues +/* Class for returning a fetched block and associated metrics. */ +private[spark] class BlockResult( + val data: Iterator[Any], + readMethod: DataReadMethod.Value, + bytes: Long) { + val inputMetrics = new InputMetrics(readMethod) + inputMetrics.bytesRead = bytes +} + private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, @@ -334,9 +344,9 @@ private[spark] class BlockManager( /** * Get block from local block manager. */ - def getLocal(blockId: BlockId): Option[Iterator[Any]] = { + def getLocal(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting local block $blockId") - doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] + doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]] } /** @@ -355,11 +365,11 @@ private[spark] class BlockManager( blockId, s"Block $blockId not found on disk, though it should be") } } else { - doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } } - private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = { + private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { @@ -386,14 +396,14 @@ private[spark] class BlockManager( // Look for the block in memory if (level.useMemory) { logDebug(s"Getting block $blockId from memory") - val result = if (asValues) { - memoryStore.getValues(blockId) + val result = if (asBlockResult) { + memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size)) } else { memoryStore.getBytes(blockId) } result match { case Some(values) => - return Some(values) + return result case None => logDebug(s"Block $blockId not found in memory") } @@ -405,10 +415,11 @@ private[spark] class BlockManager( if (tachyonStore.contains(blockId)) { tachyonStore.getBytes(blockId) match { case Some(bytes) => - if (!asValues) { + if (!asBlockResult) { return Some(bytes) } else { - return Some(dataDeserialize(blockId, bytes)) + return Some(new BlockResult( + dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size)) } case None => logDebug(s"Block $blockId not found in tachyon") @@ -429,14 +440,15 @@ private[spark] class BlockManager( if (!level.useMemory) { // If the block shouldn't be stored in memory, we can just return it - if (asValues) { - return Some(dataDeserialize(blockId, bytes)) + if (asBlockResult) { + return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, + info.size)) } else { return Some(bytes) } } else { // Otherwise, we also have to store something in the memory store - if (!level.deserialized || !asValues) { + if (!level.deserialized || !asBlockResult) { /* We'll store the bytes in memory if the block's storage level includes * "memory serialized", or if it should be cached as objects in memory * but we only requested its serialized bytes. */ @@ -445,7 +457,7 @@ private[spark] class BlockManager( memoryStore.putBytes(blockId, copyForMemory, level) bytes.rewind() } - if (!asValues) { + if (!asBlockResult) { return Some(bytes) } else { val values = dataDeserialize(blockId, bytes) @@ -453,12 +465,12 @@ private[spark] class BlockManager( // Cache the values before returning them memoryStore.putValues(blockId, values, level, returnValues = true).data match { case Left(values2) => - return Some(values2) + return Some(new BlockResult(values2, DataReadMethod.Disk, info.size)) case _ => throw new SparkException("Memory store did not return an iterator") } } else { - return Some(values) + return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) } } } @@ -473,9 +485,9 @@ private[spark] class BlockManager( /** * Get block from remote block managers. */ - def getRemote(blockId: BlockId): Option[Iterator[Any]] = { + def getRemote(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting remote block $blockId") - doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] + doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]] } /** @@ -483,10 +495,10 @@ private[spark] class BlockManager( */ def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { logDebug(s"Getting remote block $blockId as bytes") - doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } - private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = { + private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) { @@ -494,8 +506,11 @@ private[spark] class BlockManager( val data = BlockManagerWorker.syncGetBlock( GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { - if (asValues) { - return Some(dataDeserialize(blockId, data)) + if (asBlockResult) { + return Some(new BlockResult( + dataDeserialize(blockId, data), + DataReadMethod.Network, + data.limit())) } else { return Some(data) } @@ -509,7 +524,7 @@ private[spark] class BlockManager( /** * Get a block from the block manager (either local or remote). */ - def get(blockId: BlockId): Option[Iterator[Any]] = { + def get(blockId: BlockId): Option[BlockResult] = { val local = getLocal(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") @@ -788,7 +803,7 @@ private[spark] class BlockManager( * Read a block consisting of a single object. */ def getSingle(blockId: BlockId): Option[Any] = { - get(blockId).map(_.next()) + get(blockId).map(_.data.next()) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 2ec46d416f37d..673fc19c060a4 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -44,6 +44,10 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */ private val localDirs: Array[File] = createLocalDirs() + if (localDirs.isEmpty) { + logError("Failed to create any local dir.") + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) + } private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private var shuffleSender : ShuffleSender = null @@ -116,7 +120,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private def createLocalDirs(): Array[File] = { logDebug(s"Creating local directories at root dirs '$rootDirs'") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") - rootDirs.split(",").map { rootDir => + rootDirs.split(",").flatMap { rootDir => var foundLocalDir = false var localDir: File = null var localDirId: String = null @@ -136,11 +140,13 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD } } if (!foundLocalDir) { - logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir") - System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) + logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." + + " Ignoring this directory.") + None + } else { + logInfo(s"Created local directory at $localDir") + Some(localDir) } - logInfo(s"Created local directory at $localDir") - localDir } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index c694fc8c347ec..a6e6627d54e01 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -37,11 +37,7 @@ class StorageStatusListener extends SparkListener { val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId) filteredStatus.foreach { storageStatus => updatedBlocks.foreach { case (blockId, updatedStatus) => - if (updatedStatus.storageLevel == StorageLevel.NONE) { - storageStatus.blocks.remove(blockId) - } else { - storageStatus.blocks(blockId) = updatedStatus - } + storageStatus.blocks(blockId) = updatedStatus } } } diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index a107c5182b3be..328be158db680 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -78,7 +78,7 @@ private[spark] object ThreadingTest { val startTime = System.currentTimeMillis() manager.get(blockId) match { case Some(retrievedBlock) => - assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, + assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match") println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 2d8c3b949c1ac..9625337ae21a5 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -21,6 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.apache.spark.storage.StorageLevel import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils @@ -71,6 +72,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { "Complete Tasks", "Total Tasks", "Task Time", + "Input Bytes", "Shuffle Read", "Shuffle Write") @@ -96,6 +98,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { {values("Complete Tasks")} {values("Total Tasks")} {Utils.msDurationToString(values("Task Time").toLong)} + {Utils.bytesToString(values("Input Bytes").toLong)} {Utils.bytesToString(values("Shuffle Read").toLong)} {Utils.bytesToString(values("Shuffle Write").toLong)} @@ -107,7 +110,9 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort - val rddBlocks = status.blocks.size + val rddBlocks = status.blocks.count { case (_, blockStatus) => + blockStatus.storageLevel != StorageLevel.NONE + } val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed @@ -116,6 +121,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks val totalDuration = listener.executorToDuration.getOrElse(execId, 0) + val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0) @@ -133,6 +139,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { completedTasks, totalTasks, totalDuration, + totalInputBytes, totalShuffleRead, totalShuffleWrite, maxMem diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 91d37b835b19d..58eeb86bf9a3a 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -46,6 +46,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToDuration = HashMap[String, Long]() + val executorToInputBytes = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() @@ -72,6 +73,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) // Update shuffle read/write val metrics = taskEnd.taskMetrics if (metrics != null) { + metrics.inputMetrics.foreach { inputMetrics => + executorToInputBytes(eid) = + executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead + } metrics.shuffleReadMetrics.foreach { shuffleRead => executorToShuffleRead(eid) = executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 2aaf6329b792d..c4a8996c0b9a9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -28,6 +28,7 @@ class ExecutorSummary { var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 + var inputBytes: Long = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 var memoryBytesSpilled : Long = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index add0e9878a546..2a34a9af925d6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -43,6 +43,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { Total Tasks Failed Tasks Succeeded Tasks + Input Bytes Shuffle Read Shuffle Write Shuffle Spill (Memory) @@ -75,6 +76,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} + {Utils.bytesToString(v.inputBytes)} {Utils.bytesToString(v.shuffleRead)} {Utils.bytesToString(v.shuffleWrite)} {Utils.bytesToString(v.memoryBytesSpilled)} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 381a5443df8b5..2286a7f952f28 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -46,13 +46,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() - // Total metrics reflect metrics only for completed tasks - var totalTime = 0L - var totalShuffleRead = 0L - var totalShuffleWrite = 0L - // TODO: Should probably consolidate all following into a single hash map. val stageIdToTime = HashMap[Int, Long]() + val stageIdToInputBytes = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() val stageIdToMemoryBytesSpilled = HashMap[Int, Long]() @@ -93,6 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val toRemove = math.max(retainedStages / 10, 1) stages.take(toRemove).foreach { s => stageIdToTime.remove(s.stageId) + stageIdToInputBytes.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) stageIdToShuffleWrite.remove(s.stageId) stageIdToMemoryBytesSpilled.remove(s.stageId) @@ -171,6 +168,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val metrics = taskEnd.taskMetrics if (metrics != null) { + metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead } metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } y.memoryBytesSpilled += metrics.memoryBytesSpilled @@ -200,18 +198,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { stageIdToTime.getOrElseUpdate(sid, 0L) val time = metrics.map(_.executorRunTime).getOrElse(0L) stageIdToTime(sid) += time - totalTime += time + + stageIdToInputBytes.getOrElseUpdate(sid, 0L) + val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L) + stageIdToInputBytes(sid) += inputBytes stageIdToShuffleRead.getOrElseUpdate(sid, 0L) val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L) stageIdToShuffleRead(sid) += shuffleRead - totalShuffleRead += shuffleRead stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) val shuffleWrite = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L) stageIdToShuffleWrite(sid) += shuffleWrite - totalShuffleWrite += shuffleWrite stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8e3d5d1cd4c6b..afb8ed754ff8b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -48,6 +48,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) + val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L) + val hasInput = inputBytes > 0 val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L) val hasShuffleRead = shuffleReadBytes > 0 val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) @@ -69,6 +71,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { Total task time across all tasks: {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} + {if (hasInput) +
  • + Input: + {Utils.bytesToString(inputBytes)} +
  • + } {if (hasShuffleRead)
  • Shuffle read: @@ -98,13 +106,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { Seq( "Index", "ID", "Attempt", "Status", "Locality Level", "Executor", "Launch Time", "Duration", "GC Time") ++ + {if (hasInput) Seq("Input") else Nil} ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ Seq("Errors") val taskTable = UIUtils.listingTable( - taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) + taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined) @@ -159,6 +168,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def getQuantileCols(data: Seq[Double]) = Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong)) + val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble + } + val inputQuantiles = "Input" +: getQuantileCols(inputSizes) + val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } @@ -186,6 +200,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { serviceQuantiles, gettingResultQuantiles, schedulerDelayQuantiles, + if (hasInput) inputQuantiles else Nil, if (hasShuffleRead) shuffleReadQuantiles else Nil, if (hasShuffleWrite) shuffleWriteQuantiles else Nil, if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil, @@ -209,8 +224,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } } - def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) - (taskData: TaskUIData): Seq[Node] = { + def taskRow( + hasInput: Boolean, + hasShuffleRead: Boolean, + hasShuffleWrite: Boolean, + hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { taskData match { case TaskUIData(info, metrics, errorMessage) => val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) else metrics.map(_.executorRunTime).getOrElse(1L) @@ -219,6 +237,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + val maybeInput = metrics.flatMap(_.inputMetrics) + val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") + val inputReadable = maybeInput + .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") + .getOrElse("") + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") @@ -265,12 +289,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""} --> - {if (shuffleRead) { + {if (hasInput) { + + {inputReadable} + + }} + {if (hasShuffleRead) { {shuffleReadReadable} }} - {if (shuffleWrite) { + {if (hasShuffleWrite) { {writeTimeReadable} @@ -278,7 +307,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {shuffleWriteReadable} }} - {if (bytesSpilled) { + {if (hasBytesSpilled) { {memoryBytesSpilledReadable} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 30971f769682f..a9ac6d5bee9c9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -43,6 +43,7 @@ private[ui] class StageTableBase( Submitted Duration Tasks: Succeeded/Total + Input Shuffle Read Shuffle Write } @@ -123,6 +124,11 @@ private[ui] class StageTableBase( case _ => "" } val totalTasks = s.numTasks + val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L) + val inputRead = inputSortable match { + case 0 => "" + case b => Utils.bytesToString(b) + } val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L) val shuffleRead = shuffleReadSortable match { case 0 => "" @@ -150,6 +156,7 @@ private[ui] class StageTableBase( {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)} + {inputRead} {shuffleRead} {shuffleWrite} } @@ -168,7 +175,7 @@ private[ui] class FailedStageTable( override protected def stageRow(s: StageInfo): Seq[Node] = { val basicColumns = super.stageRow(s) - val failureReason = {s.failureReason.getOrElse("")} + val failureReason =
    {s.failureReason.getOrElse("")}
    basicColumns ++ failureReason } } diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 4916d9b86cca5..e3f52f6ff1e63 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.Set import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ -import org.apache.spark.{Logging, SparkException} +import org.apache.spark.{Logging, SparkEnv, SparkException} private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it @@ -101,7 +101,7 @@ private[spark] object ClosureCleaner extends Logging { } } - def clean(func: AnyRef) { + def clean(func: AnyRef, checkSerializable: Boolean = true) { // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) @@ -153,6 +153,18 @@ private[spark] object ClosureCleaner extends Logging { field.setAccessible(true) field.set(func, outer) } + + if (checkSerializable) { + ensureSerializable(func) + } + } + + private def ensureSerializable(func: AnyRef) { + try { + SparkEnv.get.closureSerializer.newInstance().serialize(func) + } catch { + case ex: Exception => throw new SparkException("Task not serializable", ex) + } } private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 6245b4b8023c2..47eb44b530379 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -26,7 +26,8 @@ import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ -import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics, + ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.storage._ import org.apache.spark._ @@ -189,8 +190,7 @@ private[spark] object JsonProtocol { ("Details" -> stageInfo.details) ~ ("Submission Time" -> submissionTime) ~ ("Completion Time" -> completionTime) ~ - ("Failure Reason" -> failureReason) ~ - ("Emitted Task Size Warning" -> stageInfo.emittedTaskSizeWarning) + ("Failure Reason" -> failureReason) } def taskInfoToJson(taskInfo: TaskInfo): JValue = { @@ -204,8 +204,7 @@ private[spark] object JsonProtocol { ("Speculative" -> taskInfo.speculative) ~ ("Getting Result Time" -> taskInfo.gettingResultTime) ~ ("Finish Time" -> taskInfo.finishTime) ~ - ("Failed" -> taskInfo.failed) ~ - ("Serialized Size" -> taskInfo.serializedSize) + ("Failed" -> taskInfo.failed) } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { @@ -213,6 +212,8 @@ private[spark] object JsonProtocol { taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing) val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) + val inputMetrics = + taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing) val updatedBlocks = taskMetrics.updatedBlocks.map { blocks => JArray(blocks.toList.map { case (id, status) => @@ -230,6 +231,7 @@ private[spark] object JsonProtocol { ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~ ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ + ("Input Metrics" -> inputMetrics) ~ ("Updated Blocks" -> updatedBlocks) } @@ -247,6 +249,11 @@ private[spark] object JsonProtocol { ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) } + def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { + ("Data Read Method" -> inputMetrics.readMethod.toString) ~ + ("Bytes Read" -> inputMetrics.bytesRead) + } + def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { val reason = Utils.getFormattedClassName(taskEndReason) val json = taskEndReason match { @@ -478,13 +485,11 @@ private[spark] object JsonProtocol { val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]) val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String]) - val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean] val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason - stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning stageInfo } @@ -500,14 +505,12 @@ private[spark] object JsonProtocol { val gettingResultTime = (json \ "Getting Result Time").extract[Long] val finishTime = (json \ "Finish Time").extract[Long] val failed = (json \ "Failed").extract[Boolean] - val serializedSize = (json \ "Serialized Size").extract[Int] val taskInfo = new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative) taskInfo.gettingResultTime = gettingResultTime taskInfo.finishTime = finishTime taskInfo.failed = failed - taskInfo.serializedSize = serializedSize taskInfo } @@ -528,6 +531,8 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson) metrics.shuffleWriteMetrics = Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) + metrics.inputMetrics = + Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson) metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value => value.extract[List[JValue]].map { block => @@ -557,6 +562,13 @@ private[spark] object JsonProtocol { metrics } + def inputMetricsFromJson(json: JValue): InputMetrics = { + val metrics = new InputMetrics( + DataReadMethod.withName((json \ "Data Read Method").extract[String])) + metrics.bytesRead = (json \ "Bytes Read").extract[Long] + metrics + } + def taskEndReasonFromJson(json: JValue): TaskEndReason = { val success = Utils.getFormattedClassName(Success) val resubmitted = Utils.getFormattedClassName(Resubmitted) diff --git a/core/src/main/scala/org/apache/spark/util/SignalLogger.scala b/core/src/main/scala/org/apache/spark/util/SignalLogger.scala new file mode 100644 index 0000000000000..d769b54fa2fae --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SignalLogger.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.commons.lang.SystemUtils +import org.slf4j.Logger +import sun.misc.{Signal, SignalHandler} + +/** + * Used to log signals received. This can be very useful in debugging crashes or kills. + * + * Inspired by Colin Patrick McCabe's similar class from Hadoop. + */ +private[spark] object SignalLogger { + + private var registered = false + + /** Register a signal handler to log signals on UNIX-like systems. */ + def register(log: Logger): Unit = synchronized { + if (SystemUtils.IS_OS_UNIX) { + require(!registered, "Can't re-install the signal handlers") + registered = true + + val signals = Seq("TERM", "HUP", "INT") + for (signal <- signals) { + try { + new SignalLoggerHandler(signal, log) + } catch { + case e: Exception => log.warn("Failed to register signal handler " + signal, e) + } + } + log.info("Registered signal handlers for [" + signals.mkString(", ") + "]") + } + } +} + +private sealed class SignalLoggerHandler(name: String, log: Logger) extends SignalHandler { + + val prevHandler = Signal.handle(new Signal(name), this) + + override def handle(signal: Signal): Unit = { + log.error("RECEIVED SIGNAL " + signal.getNumber() + ": SIG" + signal.getName()) + prevHandler.handle(signal) + } +} diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 40e7521838f2b..89407c8e395d9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (it.hasNext) { var kc = it.next() kcPairs += kc - val minHash = kc._1.hashCode() + val minHash = getKeyHashCode(kc) while (it.hasNext && it.head._1.hashCode() == minHash) { kc = it.next() kcPairs += kc @@ -294,8 +294,9 @@ class ExternalAppendOnlyMap[K, V, C]( // Select a key from the StreamBuffer that holds the lowest key hash val minBuffer = mergeHeap.dequeue() val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) - var (minKey, minCombiner) = minPairs.remove(0) - assert(minKey.hashCode() == minHash) + val minPair = minPairs.remove(0) + var (minKey, minCombiner) = minPair + assert(getKeyHashCode(minPair) == minHash) // For all other streams that may have this key (i.e. have the same minimum key hash), // merge in the corresponding value (if any) from that stream @@ -327,15 +328,16 @@ class ExternalAppendOnlyMap[K, V, C]( * StreamBuffers are ordered by the minimum key hash found across all of their own pairs. */ private class StreamBuffer( - val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)]) + val iterator: BufferedIterator[(K, C)], + val pairs: ArrayBuffer[(K, C)]) extends Comparable[StreamBuffer] { def isEmpty = pairs.length == 0 // Invalid if there are no more pairs in this stream - def minKeyHash = { + def minKeyHash: Int = { assert(pairs.length > 0) - pairs.head._1.hashCode() + getKeyHashCode(pairs.head) } override def compareTo(other: StreamBuffer): Int = { @@ -422,10 +424,22 @@ class ExternalAppendOnlyMap[K, V, C]( } private[spark] object ExternalAppendOnlyMap { + + /** + * Return the key hash code of the given (key, combiner) pair. + * If the key is null, return a special hash code. + */ + private def getKeyHashCode[K, C](kc: (K, C)): Int = { + if (kc._1 == null) 0 else kc._1.hashCode() + } + + /** + * A comparator for (key, combiner) pairs based on their key hash codes. + */ private class KCComparator[K, C] extends Comparator[(K, C)] { def compare(kc1: (K, C), kc2: (K, C)): Int = { - val hash1 = kc1._1.hashCode() - val hash2 = kc2._1.hashCode() + val hash1 = getKeyHashCode(kc1) + val hash2 = getKeyHashCode(kc2) if (hash1 < hash2) -1 else if (hash1 == hash2) 0 else 1 } } diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 20798a34755c5..ba9eda7fb36f5 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar +import org.apache.spark.executor.{DataReadMethod, TaskMetrics} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -63,7 +64,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get cached rdd") { expecting { - blockManager.get(RDDBlockId(0, 0)).andReturn(Some(Array(5, 6, 7).iterator)) + val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12) + blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result)) } whenExecuting(blockManager) { diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 12dbebcb28644..e755d2e309398 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -22,6 +22,8 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext._ import org.apache.spark.util.NonSerializable +import java.io.NotSerializableException + // Common state shared by FailureSuite-launched tasks. We use a global object // for this because any local variables used in the task closures will rightfully // be copied for each task, so there's no other way for them to share state. @@ -102,7 +104,8 @@ class FailureSuite extends FunSuite with LocalSparkContext { results.collect() } assert(thrown.getClass === classOf[SparkException]) - assert(thrown.getMessage.contains("NotSerializableException")) + assert(thrown.getMessage.contains("NotSerializableException") || + thrown.getCause.getClass === classOf[NotSerializableException]) FailureSuiteState.clear() } @@ -116,21 +119,24 @@ class FailureSuite extends FunSuite with LocalSparkContext { sc.parallelize(1 to 10, 2).map(x => a).count() } assert(thrown.getClass === classOf[SparkException]) - assert(thrown.getMessage.contains("NotSerializableException")) + assert(thrown.getMessage.contains("NotSerializableException") || + thrown.getCause.getClass === classOf[NotSerializableException]) // Non-serializable closure in an earlier stage val thrown1 = intercept[SparkException] { sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count() } assert(thrown1.getClass === classOf[SparkException]) - assert(thrown1.getMessage.contains("NotSerializableException")) + assert(thrown1.getMessage.contains("NotSerializableException") || + thrown1.getCause.getClass === classOf[NotSerializableException]) // Non-serializable closure in foreach function val thrown2 = intercept[SparkException] { sc.parallelize(1 to 10, 2).foreach(x => println(a)) } assert(thrown2.getClass === classOf[SparkException]) - assert(thrown2.getMessage.contains("NotSerializableException")) + assert(thrown2.getMessage.contains("NotSerializableException") || + thrown2.getCause.getClass === classOf[NotSerializableException]) FailureSuiteState.clear() } diff --git a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala index 4bd889135631b..8e4a9e2c9f56c 100644 --- a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala @@ -19,9 +19,29 @@ package org.apache.spark import org.scalatest.FunSuite +import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ class ImplicitOrderingSuite extends FunSuite with LocalSparkContext { + // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should. + test("basic inference of Orderings"){ + sc = new SparkContext("local", "test") + val rdd = sc.parallelize(1 to 10) + + // These RDD methods are in the companion object so that the unserializable ScalaTest Engine + // won't be reachable from the closure object + + // Infer orderings after basic maps to particular types + val basicMapExpectations = ImplicitOrderingSuite.basicMapExpectations(rdd) + basicMapExpectations.map({case (met, explain) => assert(met, explain)}) + + // Infer orderings for other RDD methods + val otherRDDMethodExpectations = ImplicitOrderingSuite.otherRDDMethodExpectations(rdd) + otherRDDMethodExpectations.map({case (met, explain) => assert(met, explain)}) + } +} + +private object ImplicitOrderingSuite { class NonOrderedClass {} class ComparableClass extends Comparable[ComparableClass] { @@ -31,27 +51,36 @@ class ImplicitOrderingSuite extends FunSuite with LocalSparkContext { class OrderedClass extends Ordered[OrderedClass] { override def compare(o: OrderedClass): Int = ??? } - - // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should. - test("basic inference of Orderings"){ - sc = new SparkContext("local", "test") - val rdd = sc.parallelize(1 to 10) - - // Infer orderings after basic maps to particular types - assert(rdd.map(x => (x, x)).keyOrdering.isDefined) - assert(rdd.map(x => (1, x)).keyOrdering.isDefined) - assert(rdd.map(x => (x.toString, x)).keyOrdering.isDefined) - assert(rdd.map(x => (null, x)).keyOrdering.isDefined) - assert(rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty) - assert(rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined) - assert(rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined) - - // Infer orderings for other RDD methods - assert(rdd.groupBy(x => x).keyOrdering.isDefined) - assert(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty) - assert(rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined) - assert(rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined) - assert(rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined) - assert(rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined) + + def basicMapExpectations(rdd: RDD[Int]) = { + List((rdd.map(x => (x, x)).keyOrdering.isDefined, + "rdd.map(x => (x, x)).keyOrdering.isDefined"), + (rdd.map(x => (1, x)).keyOrdering.isDefined, + "rdd.map(x => (1, x)).keyOrdering.isDefined"), + (rdd.map(x => (x.toString, x)).keyOrdering.isDefined, + "rdd.map(x => (x.toString, x)).keyOrdering.isDefined"), + (rdd.map(x => (null, x)).keyOrdering.isDefined, + "rdd.map(x => (null, x)).keyOrdering.isDefined"), + (rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty, + "rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty"), + (rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined, + "rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined"), + (rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined, + "rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined")) } -} + + def otherRDDMethodExpectations(rdd: RDD[Int]) = { + List((rdd.groupBy(x => x).keyOrdering.isDefined, + "rdd.groupBy(x => x).keyOrdering.isDefined"), + (rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty, + "rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty"), + (rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined, + "rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined"), + (rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined, + "rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined"), + (rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined, + "rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined"), + (rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined, + "rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined")) + } +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index b40fee7e9ab23..c4f2f7e34f4d5 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -206,6 +206,42 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { // substracted rdd return results as Tuple2 results(0) should be ((3, 33)) } + + test("sort with Java non serializable class - Kryo") { + // Use a local cluster with 2 processes to make sure there are both local and remote blocks + val conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .setAppName("test") + .setMaster("local-cluster[2,1,512]") + sc = new SparkContext(conf) + val a = sc.parallelize(1 to 10, 2) + val b = a.map { x => + (new NonJavaSerializableClass(x), x) + } + // If the Kryo serializer is not used correctly, the shuffle would fail because the + // default Java serializer cannot handle the non serializable class. + val c = b.sortByKey().map(x => x._2) + assert(c.collect() === Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + } + + test("sort with Java non serializable class - Java") { + // Use a local cluster with 2 processes to make sure there are both local and remote blocks + val conf = new SparkConf() + .setAppName("test") + .setMaster("local-cluster[2,1,512]") + sc = new SparkContext(conf) + val a = sc.parallelize(1 to 10, 2) + val b = a.map { x => + (new NonJavaSerializableClass(x), x) + } + // default Java serializer cannot handle the non serializable class. + val thrown = intercept[SparkException] { + b.sortByKey().collect() + } + + assert(thrown.getClass === classOf[SparkException]) + assert(thrown.getMessage.contains("NotSerializableException")) + } } object ShuffleSuite { @@ -215,5 +251,9 @@ object ShuffleSuite { x + y } - class NonJavaSerializableClass(val value: Int) + class NonJavaSerializableClass(val value: Int) extends Comparable[NonJavaSerializableClass] { + override def compareTo(o: NonJavaSerializableClass): Int = { + value - o.value + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8dd2a9b9f7373..9f498d579a095 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import scala.Tuple2 import scala.collection.mutable.{HashSet, HashMap, Map} import scala.language.reflectiveCalls @@ -38,6 +37,8 @@ class BuggyDAGEventProcessActor extends Actor { } } +class DAGSchedulerSuiteDummyException extends Exception + class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike with ImplicitSender with BeforeAndAfter with LocalSparkContext { @@ -593,6 +594,59 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + // TODO: Fix this and un-ignore the test. + ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") { + val acc = new Accumulator[Int](0, new AccumulatorParam[Int] { + override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2 + override def zero(initialValue: Int): Int = 0 + override def addInPlace(r1: Int, r2: Int): Int = { + throw new DAGSchedulerSuiteDummyException + } + }) + + // Run this on executors + intercept[SparkDriverExecutionException] { + sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } + } + + // Run this within a local thread + intercept[SparkDriverExecutionException] { + sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1) + } + + // Make sure we can still run local commands as well as cluster commands. + assert(sc.parallelize(1 to 10, 2).count() === 10) + assert(sc.parallelize(1 to 10, 2).first() === 1) + } + + test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") { + val e1 = intercept[SparkDriverExecutionException] { + val rdd = sc.parallelize(1 to 10, 2) + sc.runJob[Int, Int]( + rdd, + (context: TaskContext, iter: Iterator[Int]) => iter.size, + Seq(0), + allowLocal = true, + (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) + } + assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) + + val e2 = intercept[SparkDriverExecutionException] { + val rdd = sc.parallelize(1 to 10, 2) + sc.runJob[Int, Int]( + rdd, + (context: TaskContext, iter: Iterator[Int]) => iter.size, + Seq(0, 1), + allowLocal = false, + (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) + } + assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) + + // Make sure we can still run local commands as well as cluster commands. + assert(sc.parallelize(1 to 10, 2).count() === 10) + assert(sc.parallelize(1 to 10, 2).first() === 1) + } + test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") { val actorSystem = ActorSystem("test") val supervisor = actorSystem.actorOf( diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6df0a080961b6..71f48e295ecca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) { + taskMetrics.inputMetrics should not be ('defined) taskMetrics.shuffleWriteMetrics should be ('defined) taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 59a618956a356..9ff2a487005c4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.Random + import scala.collection.mutable.ArrayBuffer import scala.collection.mutable @@ -83,6 +85,18 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex } } +/** + * A Task implementation that results in a large serialized task. + */ +class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) { + val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) + val random = new Random(0) + random.nextBytes(randomBuffer) + + override def runTask(context: TaskContext): Array[Byte] = randomBuffer + override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() +} + class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL} @@ -434,6 +448,33 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) } + test("do not emit warning when serialized task is small") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + val taskSet = FakeTask.createTaskSet(1) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + + assert(!manager.emittedTaskSizeWarning) + + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + + assert(!manager.emittedTaskSizeWarning) + } + + test("emit warning when serialized task is large") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + + val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + + assert(!manager.emittedTaskSizeWarning) + + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + + assert(manager.emittedTaskSizeWarning) + } + def createTaskResult(id: Int): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala new file mode 100644 index 0000000000000..5d15a68ac7e4f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer; + +import java.io.NotSerializableException + +import org.scalatest.FunSuite + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkException +import org.apache.spark.SharedSparkContext + +/* A trivial (but unserializable) container for trivial functions */ +class UnserializableClass { + def op[T](x: T) = x.toString + + def pred[T](x: T) = x.toString.length % 2 == 0 +} + +class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContext { + + def fixture = (sc.parallelize(0 until 1000).map(_.toString), new UnserializableClass) + + test("throws expected serialization exceptions on actions") { + val (data, uc) = fixture + + val ex = intercept[SparkException] { + data.map(uc.op(_)).count + } + + assert(ex.getMessage.contains("Task not serializable")) + } + + // There is probably a cleaner way to eliminate boilerplate here, but we're + // iterating over a map from transformation names to functions that perform that + // transformation on a given RDD, creating one test case for each + + for (transformation <- + Map("map" -> xmap _, "flatMap" -> xflatMap _, "filter" -> xfilter _, + "mapWith" -> xmapWith _, "mapPartitions" -> xmapPartitions _, + "mapPartitionsWithIndex" -> xmapPartitionsWithIndex _, + "mapPartitionsWithContext" -> xmapPartitionsWithContext _, + "filterWith" -> xfilterWith _)) { + val (name, xf) = transformation + + test(s"$name transformations throw proactive serialization exceptions") { + val (data, uc) = fixture + + val ex = intercept[SparkException] { + xf(data, uc) + } + + assert(ex.getMessage.contains("Task not serializable"), + s"RDD.$name doesn't proactively throw NotSerializableException") + } + } + + private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.map(y=>uc.op(y)) + private def xmapWith(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.mapWith(x => x.toString)((x,y)=>x + uc.op(y)) + private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.flatMap(y=>Seq(uc.op(y))) + private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.filter(y=>uc.pred(y)) + private def xfilterWith(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.filterWith(x => x.toString)((x,y)=>uc.pred(y)) + private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.mapPartitions(_.map(y=>uc.op(y))) + private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.mapPartitionsWithIndex((_, it) => it.map(y=>uc.op(y))) + private def xmapPartitionsWithContext(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.mapPartitionsWithContext((_, it) => it.map(y=>uc.op(y))) + +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d7dbe5164b7f6..23cb6905bfdeb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.Matchers import org.scalatest.time.SpanSugar._ -import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.executor.DataReadMethod import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.language.postfixOps @@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } } + test("correct BlockResult returned from get() calls") { + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr, + mapOutputTracker) + val list1 = List(new Array[Byte](200), new Array[Byte](200)) + val list1ForSizeEstimate = new ArrayBuffer[Any] + list1ForSizeEstimate ++= list1.iterator + val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate) + val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150)) + val list2ForSizeEstimate = new ArrayBuffer[Any] + list2ForSizeEstimate ++= list2.iterator + val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + val list1Get = store.get("list1") + assert(list1Get.isDefined, "list1 expected to be in store") + assert(list1Get.get.data.size === 2) + assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate) + assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory) + val list2MemoryGet = store.get("list2memory") + assert(list2MemoryGet.isDefined, "list2memory expected to be in store") + assert(list2MemoryGet.get.data.size === 3) + assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate) + assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory) + val list2DiskGet = store.get("list2disk") + assert(list2DiskGet.isDefined, "list2memory expected to be in store") + assert(list2DiskGet.get.data.size === 3) + System.out.println(list2DiskGet) + // We don't know the exact size of the data on disk, but it should certainly be > 0. + assert(list2DiskGet.get.inputMetrics.bytesRead > 0) + assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk) + } + test("in-memory LRU storage") { store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr, mapOutputTracker) @@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) assert(store.get("list3").isDefined, "list3 was not in store") - assert(store.get("list3").get.size == 2) + assert(store.get("list3").get.data.size === 2) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list1").isDefined, "list1 was not in store") - assert(store.get("list1").get.size == 2) + assert(store.get("list1").get.data.size === 2) assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) assert(store.get("list3") === None, "list1 was in store") } @@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + val listForSizeEstimate = new ArrayBuffer[Any] + listForSizeEstimate ++= list1.iterator + val listSize = SizeEstimator.estimate(listForSizeEstimate) // At this point LRU should not kick in because list3 is only on disk - assert(store.get("list1").isDefined, "list2 was not in store") - assert(store.get("list1").get.size === 2) - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) - assert(store.get("list1").isDefined, "list2 was not in store") - assert(store.get("list1").get.size === 2) - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) + assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.get("list1").get.data.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) + assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.get("list1").get.data.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) assert(store.get("list1") === None, "list1 was in store") - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) assert(store.get("list4").isDefined, "list4 was not in store") - assert(store.get("list4").get.size === 2) + assert(store.get("list4").get.data.size === 2) } test("negative byte values in ByteBufferInputStream") { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6c49870455873..058d31453081a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite { val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, - makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800)) + makeTaskInfo(123L, 234, 67, 345L, false), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false)) + val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + makeTaskInfo(123L, 234, 67, 345L, false), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite { testEvent(taskStart, taskStartJsonString) testEvent(taskGettingResult, taskGettingResultJsonString) testEvent(taskEnd, taskEndJsonString) + testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString) testEvent(jobStart, jobStartJsonString) testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) @@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) - testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8)) + testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) // StorageLevel @@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite { testBlockId(StreamBlockId(1, 2L)) } - test("Backward compatibility") { + test("StageInfo.details backward compatibility") { // StageInfo.details was added after 1.0.0. val info = makeStageInfo(1, 2, 3, 4L, 5L) assert(info.details.nonEmpty) @@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite { assert("" === newInfo.details) } + test("InputMetrics backward compatibility") { + // InputMetrics were added after 1.0.1. + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) + assert(metrics.inputMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.inputMetrics.isEmpty) + } + /** -------------------------- * | Helper test running methods | @@ -242,7 +257,6 @@ class JsonProtocolSuite extends FunSuite { assert(info1.numTasks === info2.numTasks) assert(info1.submissionTime === info2.submissionTime) assert(info1.completionTime === info2.completionTime) - assert(info1.emittedTaskSizeWarning === info2.emittedTaskSizeWarning) assert(info1.rddInfos.size === info2.rddInfos.size) (0 until info1.rddInfos.size).foreach { i => assertEquals(info1.rddInfos(i), info2.rddInfos(i)) @@ -279,7 +293,6 @@ class JsonProtocolSuite extends FunSuite { assert(info1.gettingResultTime === info2.gettingResultTime) assert(info1.finishTime === info2.finishTime) assert(info1.failed === info2.failed) - assert(info1.serializedSize === info2.serializedSize) } private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) { @@ -294,6 +307,8 @@ class JsonProtocolSuite extends FunSuite { metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals) assertOptionEquals( metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals) + assertOptionEquals( + metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals) assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals) } @@ -311,6 +326,11 @@ class JsonProtocolSuite extends FunSuite { assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime) } + private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) { + assert(metrics1.readMethod === metrics2.readMethod) + assert(metrics1.bytesRead === metrics2.bytesRead) + } + private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) { assert(bm1.executorId === bm2.executorId) assert(bm1.host === bm2.host) @@ -403,6 +423,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(w1, w2) } + private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) { + assertEquals(i1, i2) + } + private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) { assertEquals(t1, t2) } @@ -460,9 +484,19 @@ class JsonProtocolSuite extends FunSuite { new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) } - private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = { + /** + * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is + * set to true) or read data from a shuffle otherwise. + */ + private def makeTaskMetrics( + a: Long, + b: Long, + c: Long, + d: Long, + e: Int, + f: Int, + hasHadoopInput: Boolean) = { val t = new TaskMetrics - val sr = new ShuffleReadMetrics val sw = new ShuffleWriteMetrics t.hostname = "localhost" t.executorDeserializeTime = a @@ -471,15 +505,23 @@ class JsonProtocolSuite extends FunSuite { t.jvmGCTime = d t.resultSerializationTime = a + b t.memoryBytesSpilled = a + c - sr.shuffleFinishTime = b + c - sr.totalBlocksFetched = e + f - sr.remoteBytesRead = b + d - sr.localBlocksFetched = e - sr.fetchWaitTime = a + d - sr.remoteBlocksFetched = f + + if (hasHadoopInput) { + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + inputMetrics.bytesRead = d + e + f + t.inputMetrics = Some(inputMetrics) + } else { + val sr = new ShuffleReadMetrics + sr.shuffleFinishTime = b + c + sr.totalBlocksFetched = e + f + sr.remoteBytesRead = b + d + sr.localBlocksFetched = e + sr.fetchWaitTime = a + d + sr.remoteBlocksFetched = f + t.shuffleReadMetrics = Some(sr) + } sw.shuffleBytesWritten = a + b + c sw.shuffleWriteTime = b + c + d - t.shuffleReadMetrics = Some(sr) t.shuffleWriteMetrics = Some(sw) // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => @@ -496,9 +538,8 @@ class JsonProtocolSuite extends FunSuite { private val stageSubmittedJsonString = """ {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": - "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details", - "Emitted Task Size Warning":false},"Properties":{"France":"Paris","Germany":"Berlin", - "Russia":"Moscow","Ukraine":"Kiev"}} + "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties": + {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} """ private val stageCompletedJsonString = @@ -507,8 +548,7 @@ class JsonProtocolSuite extends FunSuite { "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301, - "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details", - "Emitted Task Size Warning":false}} + "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details"}} """ private val taskStartJsonString = @@ -516,7 +556,7 @@ class JsonProtocolSuite extends FunSuite { |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0, - |"Failed":false,"Serialized Size":0}} + |"Failed":false}} """.stripMargin private val taskGettingResultJsonString = @@ -524,7 +564,7 @@ class JsonProtocolSuite extends FunSuite { |{"Event":"SparkListenerTaskGettingResult","Task Info": | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor", | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0, - | "Finish Time":0,"Failed":false,"Serialized Size":0 + | "Finish Time":0,"Failed":false | } |} """.stripMargin @@ -536,7 +576,7 @@ class JsonProtocolSuite extends FunSuite { |"Task Info":{ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + | "Getting Result Time":0,"Finish Time":0,"Failed":false |}, |"Task Metrics":{ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, @@ -552,8 +592,9 @@ class JsonProtocolSuite extends FunSuite { | }, | "Shuffle Write Metrics":{ | "Shuffle Bytes Written":1200, - | "Shuffle Write Time":1500}, - | "Updated Blocks":[ + | "Shuffle Write Time":1500 + | }, + | "Updated Blocks":[ | {"Block ID":"rdd_0_0", | "Status":{ | "Storage Level":{ @@ -568,6 +609,35 @@ class JsonProtocolSuite extends FunSuite { |} """.stripMargin + private val taskEndWithHadoopInputJsonString = + """ + |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", + |"Task End Reason":{"Reason":"Success"}, + |"Task Info":{ + | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", + | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, + | "Getting Result Time":0,"Finish Time":0,"Failed":false + |}, + |"Task Metrics":{ + | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, + | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, + | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, + | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500}, + | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100}, + | "Updated Blocks":[ + | {"Block ID":"rdd_0_0", + | "Status":{ + | "Storage Level":{ + | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + | "Replication":2 + | }, + | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + | } + | } + | ]} + |} + """ + private val jobStartJsonString = """ {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties": diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index deb780953579d..428822949c085 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -334,8 +334,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) - val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, - mergeValue, mergeCombiners) + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]]( + createCombiner, mergeValue, mergeCombiners) (1 to 100000).foreach { i => map.insert(i, i) } map.insert(Int.MaxValue, Int.MaxValue) @@ -346,11 +346,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { it.next() } } + + test("spilling with null keys and values") { + val conf = new SparkConf(true) + conf.set("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext("local-cluster[1,1,512]", "test", conf) + + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]]( + createCombiner, mergeValue, mergeCombiners) + + (1 to 100000).foreach { i => map.insert(i, i) } + map.insert(null.asInstanceOf[Int], 1) + map.insert(1, null.asInstanceOf[Int]) + map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int]) + + val it = map.iterator + while (it.hasNext) { + // Should not throw NullPointerException + it.next() + } + } + } /** * A dummy class that always returns the same hash code, to easily test hash collisions */ -case class FixedHashObject(val v: Int, val h: Int) extends Serializable { +case class FixedHashObject(v: Int, h: Int) extends Serializable { override def hashCode(): Int = h } diff --git a/docs/mllib-naive-bayes.md b/docs/mllib-naive-bayes.md index 4b3a7cab32118..1d1d7dcf6ffcb 100644 --- a/docs/mllib-naive-bayes.md +++ b/docs/mllib-naive-bayes.md @@ -51,9 +51,8 @@ val training = splits(0) val test = splits(1) val model = NaiveBayes.train(training, lambda = 1.0) -val prediction = model.predict(test.map(_.features)) -val predictionAndLabel = prediction.zip(test.map(_.label)) +val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count() {% endhighlight %} @@ -71,6 +70,7 @@ can be used for evaluation and prediction. import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.classification.NaiveBayes; import org.apache.spark.mllib.classification.NaiveBayesModel; import org.apache.spark.mllib.regression.LabeledPoint; @@ -81,18 +81,12 @@ JavaRDD test = ... // test set final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0); -JavaRDD prediction = - test.map(new Function() { - @Override public Double call(LabeledPoint p) { - return model.predict(p.features()); - } - }); JavaPairRDD predictionAndLabel = - prediction.zip(test.map(new Function() { - @Override public Double call(LabeledPoint p) { - return p.label(); + test.mapToPair(new PairFunction() { + @Override public Tuple2 call(LabeledPoint p) { + return new Tuple2(model.predict(p.features()), p.label()); } - })); + }); double accuracy = 1.0 * predictionAndLabel.filter(new Function, Boolean>() { @Override public Boolean call(Tuple2 pl) { return pl._1() == pl._2(); diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 65d75b85efda6..06e4c4ce527e1 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -145,7 +145,7 @@ that contains information about your application. {% highlight python %} conf = SparkConf().setAppName(appName).setMaster(master) -sc = SparkContext(conf) +sc = SparkContext(conf=conf) {% endhighlight %} diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index e3c8922404365..bd046cfc1837d 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -127,7 +127,8 @@ val sc = new SparkContext(conf) {% endhighlight %} (You can also use [`spark-submit`](submitting-applications.html) and configure `spark.executor.uri` -in the [conf/spark-defaults.conf](configuration.html#loading-default-configurations) file.) +in the [conf/spark-defaults.conf](configuration.html#loading-default-configurations) file. Note +that `spark-submit` currently only supports deploying the Spark driver in `client` mode for Mesos.) When running a shell, the `spark.executor.uri` parameter is inherited from `SPARK_EXECUTOR_URI`, so it does not need to be redundantly passed in as a system property. diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 3c1ce06083ede..f5c0f7cef83d2 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -235,11 +235,10 @@ You can also pass an option `--cores ` to control the number of cores # Launching Compiled Spark Applications -Spark supports two deploy modes: applications may run with the driver inside the client process or -entirely inside the cluster. The -[`spark-submit` script](submitting-applications.html) provides the -most straightforward way to submit a compiled Spark application to the cluster in either deploy -mode. +The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to +submit a compiled Spark application to the cluster. For standalone clusters, Spark currently +only supports deploying the driver inside the client process that is submitting the application +(`client` deploy mode). If your application is launched through Spark submit, then the application jar is automatically distributed to all worker nodes. For any additional jars that your application depends on, you diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index d2864fe4c2f65..e05883072bfa8 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -42,10 +42,22 @@ Some of the commonly used options are: * `--class`: The entry point for your application (e.g. `org.apache.spark.examples.SparkPi`) * `--master`: The [master URL](#master-urls) for the cluster (e.g. `spark://23.195.26.187:7077`) -* `--deploy-mode`: Whether to deploy your driver program within the cluster or run it locally as an external client (either `cluster` or `client`) +* `--deploy-mode`: Whether to deploy your driver on the worker nodes (`cluster`) or locally as an external client (`client`) (default: `client`)* * `application-jar`: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an `hdfs://` path or a `file://` path that is present on all nodes. * `application-arguments`: Arguments passed to the main method of your main class, if any +*A common deployment strategy is to submit your application from a gateway machine that is +physically co-located with your worker machines (e.g. Master node in a standalone EC2 cluster). +In this setup, `client` mode is appropriate. In `client` mode, the driver is launched directly +within the client `spark-submit` process, with the input and output of the application attached +to the console. Thus, this mode is especially suitable for applications that involve the REPL +(e.g. Spark shell). + +Alternatively, if your application is submitted from a machine far from the worker machines (e.g. +locally on your laptop), it is common to use `cluster` mode to minimize network latency between +the drivers and the executors. Note that `cluster` mode is currently not supported for standalone +clusters, Mesos clusters, or python applications. + For Python applications, simply pass a `.py` file in the place of `` instead of a JAR, and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`. diff --git a/make-distribution.sh b/make-distribution.sh index 86868438e75c3..94b473bf91cd3 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -100,6 +100,14 @@ if [ -z "$JAVA_HOME" ]; then exit -1 fi +if which git &>/dev/null; then + GITREV=$(git rev-parse --short HEAD 2>/dev/null || :) + if [ ! -z $GITREV ]; then + GITREVSTRING=" (git revision $GITREV)" + fi + unset GITREV +fi + if ! which mvn &>/dev/null; then echo -e "You need Maven installed to build Spark." echo -e "Download Maven from https://maven.apache.org/" @@ -186,7 +194,7 @@ ${BUILD_COMMAND} # Make directories rm -rf "$DISTDIR" mkdir -p "$DISTDIR/lib" -echo "Spark $VERSION built for Hadoop $SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE" +echo "Spark $VERSION$GITREVSTRING built for Hadoop $SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE" # Copy jars cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/" diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 1a0073c9d487e..695e03b736baf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -347,6 +347,8 @@ class RowMatrix( * The principal components are stored a local matrix of size n-by-k. * Each column corresponds for one principal component, * and the columns are in descending order of component variance. + * The row data do not need to be "centered" first; it is not necessary for + * the mean of each column to be 0. * * @param k number of top principal components. * @return a matrix of size n-by-k, whose columns are principal components diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 55a2aa0fc7141..599714233c18f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -371,7 +371,7 @@ object SparkBuild extends Build { "net.java.dev.jets3t" % "jets3t" % jets3tVersion excludeAll(excludeCommonsLogging), "commons-codec" % "commons-codec" % "1.5", // Prevent jets3t from including the older version of commons-codec "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm), + "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm, excludeServletApi), "org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeJBossNetty), "com.codahale.metrics" % "metrics-core" % codahaleMetricsVersion, "com.codahale.metrics" % "metrics-jvm" % codahaleMetricsVersion, diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index b2f226a55ec13..5eb1c63bf206b 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -103,6 +103,7 @@ def waitSocketClose(sock): if os.fork() == 0: # Leave the worker pool signal.signal(SIGHUP, SIG_DFL) + signal.signal(SIGCHLD, SIG_DFL) listen_sock.close() # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index ada48eaf5dc0f..5a55be1e51558 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -47,10 +47,13 @@ object ScalaReflection { val TypeRef(_, _, Seq(optType)) = t Schema(schemaFor(optType).dataType, nullable = true) case t if t <:< typeOf[Product] => - val params = t.member("": TermName).asMethod.paramss + val formalTypeArgs = t.typeSymbol.asClass.typeParams + val TypeRef(_, _, actualTypeArgs) = t + val params = t.member(nme.CONSTRUCTOR).asMethod.paramss Schema(StructType( params.head.map { p => - val Schema(dataType, nullable) = schemaFor(p.typeSignature) + val Schema(dataType, nullable) = + schemaFor(p.typeSignature.substituteTypes(formalTypeArgs, actualTypeArgs)) StructField(p.name.toString, dataType, nullable) }), nullable = true) // Need to decide if we actually need a special type here. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 0cc4592047b19..61762fa2a7c30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.types._ * * Limitations: * - Only supports a very limited subset of SQL. - * - Keywords must be capital. * * This is currently included mostly for illustrative purposes. Users wanting more complete support * for a SQL like language should checkout the HiveQL support in the sql/hive sub-project. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 26ad4837b0b01..1b503b957d146 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -108,6 +108,17 @@ package object dsl { implicit def symbolToUnresolvedAttribute(s: Symbol) = analysis.UnresolvedAttribute(s.name) + def sum(e: Expression) = Sum(e) + def sumDistinct(e: Expression) = SumDistinct(e) + def count(e: Expression) = Count(e) + def countDistinct(e: Expression*) = CountDistinct(e) + def avg(e: Expression) = Average(e) + def first(e: Expression) = First(e) + def min(e: Expression) = Min(e) + def max(e: Expression) = Max(e) + def upper(e: Expression) = Upper(e) + def lower(e: Expression) = Lower(e) + implicit class DslSymbol(sym: Symbol) extends ImplicitAttribute { def s = sym.name } // TODO more implicit class for literal? implicit class DslString(val s: String) extends ImplicitOperators { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 0411ce3aefda1..ba62dabe3dd6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -111,7 +111,7 @@ abstract class Expression extends TreeNode[Expression] { } else { e1.dataType match { case n: NumericType => - f.asInstanceOf[(Numeric[n.JvmType], n.JvmType, n.JvmType) => Int]( + f.asInstanceOf[(Numeric[n.JvmType], n.JvmType, n.JvmType) => n.JvmType]( n.numeric, evalE1.asInstanceOf[n.JvmType], evalE2.asInstanceOf[n.JvmType]) case other => sys.error(s"Type $other does not support numeric operations") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index 489d7e9c2437f..c0438dbe52a47 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -60,6 +60,9 @@ case class ComplexData( mapField: Map[Int, String], structField: PrimitiveData) +case class GenericData[A]( + genericField: A) + class ScalaReflectionSuite extends FunSuite { import ScalaReflection._ @@ -128,4 +131,21 @@ class ScalaReflectionSuite extends FunSuite { nullable = true))), nullable = true)) } + + test("generic data") { + val schema = schemaFor[GenericData[Int]] + assert(schema === Schema( + StructType(Seq( + StructField("genericField", IntegerType, nullable = false))), + nullable = true)) + } + + test("tuple data") { + val schema = schemaFor[(Int, String)] + assert(schema === Schema( + StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", StringType, nullable = true))), + nullable = true)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 2fe7f94663996..3b5abab969861 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -50,7 +50,7 @@ trait SQLConf { /** ********************** SQLConf functionality methods ************ */ @transient - private val settings = java.util.Collections.synchronizedMap( + protected[sql] val settings = java.util.Collections.synchronizedMap( new java.util.HashMap[String, String]()) def set(props: Properties): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 7c0efb4566610..8f9f54f610e9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -133,8 +133,13 @@ class SchemaRDD( * * @group Query */ - def select(exprs: NamedExpression*): SchemaRDD = - new SchemaRDD(sqlContext, Project(exprs, logicalPlan)) + def select(exprs: Expression*): SchemaRDD = { + val aliases = exprs.zipWithIndex.map { + case (ne: NamedExpression, _) => ne + case (e, i) => Alias(e, s"c$i")() + } + new SchemaRDD(sqlContext, Project(aliases, logicalPlan)) + } /** * Filters the output, only returning those rows where `condition` evaluates to true. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index e4a64a7a482b8..04ac008682f5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -60,6 +60,26 @@ class DslQuerySuite extends QueryTest { Seq(Seq("1"))) } + test("select with functions") { + checkAnswer( + testData.select(sum('value), avg('value), count(1)), + Seq(Seq(5050.0, 50.5, 100))) + + checkAnswer( + testData2.select('a + 'b, 'a < 'b), + Seq( + Seq(2, false), + Seq(3, true), + Seq(3, false), + Seq(4, false), + Seq(4, false), + Seq(5, false))) + + checkAnswer( + testData2.select(sumDistinct('a)), + Seq(Seq(6))) + } + test("sorting") { checkAnswer( testData2.orderBy('a.asc, 'b.asc), @@ -110,17 +130,17 @@ class DslQuerySuite extends QueryTest { test("average") { checkAnswer( - testData2.groupBy()(Average('a)), + testData2.groupBy()(avg('a)), 2.0) } test("null average") { checkAnswer( - testData3.groupBy()(Average('b)), + testData3.groupBy()(avg('b)), 2.0) checkAnswer( - testData3.groupBy()(Average('b), CountDistinct('b :: Nil)), + testData3.groupBy()(avg('b), countDistinct('b)), (2.0, 1) :: Nil) } @@ -130,17 +150,17 @@ class DslQuerySuite extends QueryTest { test("null count") { checkAnswer( - testData3.groupBy('a)('a, Count('b)), + testData3.groupBy('a)('a, count('b)), Seq((1,0), (2, 1)) ) checkAnswer( - testData3.groupBy('a)('a, Count('a + 'b)), + testData3.groupBy('a)('a, count('a + 'b)), Seq((1,0), (2, 1)) ) checkAnswer( - testData3.groupBy()(Count('a), Count('b), Count(1), CountDistinct('a :: Nil), CountDistinct('b :: Nil)), + testData3.groupBy()(count('a), count('b), count(1), countDistinct('a), countDistinct('b)), (2, 1, 2, 2, 1) :: Nil ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 3d7d5eedbe8ed..054b14f8f7ffa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -39,25 +39,27 @@ class JoinSuite extends QueryTest { test("plans broadcast hash join, given hints") { def mkTest(buildSide: BuildSide, leftTable: String, rightTable: String) = { - TestSQLContext.set("spark.sql.join.broadcastTables", - s"${if (buildSide == BuildRight) rightTable else leftTable}") - val rdd = sql(s"""SELECT * FROM $leftTable JOIN $rightTable ON key = a""") - // Using `sparkPlan` because for relevant patterns in HashJoin to be - // matched, other strategies need to be applied. - val physical = rdd.queryExecution.sparkPlan - val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == buildSide => j } - - assert(bhj.size === 1, "planner does not pick up hint to generate broadcast hash join") - checkAnswer( - rdd, - Seq( - (1, "1", 1, 1), - (1, "1", 1, 2), - (2, "2", 2, 1), - (2, "2", 2, 2), - (3, "3", 3, 1), - (3, "3", 3, 2) - )) + TestSQLContext.settings.synchronized { + TestSQLContext.set("spark.sql.join.broadcastTables", + s"${if (buildSide == BuildRight) rightTable else leftTable}") + val rdd = sql( s"""SELECT * FROM $leftTable JOIN $rightTable ON key = a""") + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + val physical = rdd.queryExecution.sparkPlan + val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == buildSide => j} + + assert(bhj.size === 1, "planner does not pick up hint to generate broadcast hash join") + checkAnswer( + rdd, + Seq( + (1, "1", 1, 1), + (1, "1", 1, 2), + (2, "2", 2, 1), + (2, "2", 2, 2), + (3, "3", 3, 1), + (3, "3", 3, 2) + )) + } } mkTest(BuildRight, "testData", "testData2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala index 08293f7f0ca30..93792f698cfaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -28,46 +28,50 @@ class SQLConfSuite extends QueryTest { val testVal = "test.val.0" test("programmatic ways of basic setting and getting") { - clear() - assert(getOption(testKey).isEmpty) - assert(getAll.toSet === Set()) + TestSQLContext.settings.synchronized { + clear() + assert(getOption(testKey).isEmpty) + assert(getAll.toSet === Set()) - set(testKey, testVal) - assert(get(testKey) == testVal) - assert(get(testKey, testVal + "_") == testVal) - assert(getOption(testKey) == Some(testVal)) - assert(contains(testKey)) + set(testKey, testVal) + assert(get(testKey) == testVal) + assert(get(testKey, testVal + "_") == testVal) + assert(getOption(testKey) == Some(testVal)) + assert(contains(testKey)) - // Tests SQLConf as accessed from a SQLContext is mutable after - // the latter is initialized, unlike SparkConf inside a SparkContext. - assert(TestSQLContext.get(testKey) == testVal) - assert(TestSQLContext.get(testKey, testVal + "_") == testVal) - assert(TestSQLContext.getOption(testKey) == Some(testVal)) - assert(TestSQLContext.contains(testKey)) + // Tests SQLConf as accessed from a SQLContext is mutable after + // the latter is initialized, unlike SparkConf inside a SparkContext. + assert(TestSQLContext.get(testKey) == testVal) + assert(TestSQLContext.get(testKey, testVal + "_") == testVal) + assert(TestSQLContext.getOption(testKey) == Some(testVal)) + assert(TestSQLContext.contains(testKey)) - clear() + clear() + } } test("parse SQL set commands") { - clear() - sql(s"set $testKey=$testVal") - assert(get(testKey, testVal + "_") == testVal) - assert(TestSQLContext.get(testKey, testVal + "_") == testVal) + TestSQLContext.settings.synchronized { + clear() + sql(s"set $testKey=$testVal") + assert(get(testKey, testVal + "_") == testVal) + assert(TestSQLContext.get(testKey, testVal + "_") == testVal) - sql("set mapred.reduce.tasks=20") - assert(get("mapred.reduce.tasks", "0") == "20") - sql("set mapred.reduce.tasks = 40") - assert(get("mapred.reduce.tasks", "0") == "40") + sql("set mapred.reduce.tasks=20") + assert(get("mapred.reduce.tasks", "0") == "20") + sql("set mapred.reduce.tasks = 40") + assert(get("mapred.reduce.tasks", "0") == "40") - val key = "spark.sql.key" - val vs = "val0,val_1,val2.3,my_table" - sql(s"set $key=$vs") - assert(get(key, "0") == vs) + val key = "spark.sql.key" + val vs = "val0,val_1,val2.3,my_table" + sql(s"set $key=$vs") + assert(get(key, "0") == vs) - sql(s"set $key=") - assert(get(key, "0") == "") + sql(s"set $key=") + assert(get(key, "0") == "") - clear() + clear() + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bf7fafe952303..2c1cb1867010c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -372,38 +372,40 @@ class SQLQuerySuite extends QueryTest { } test("SET commands semantics using sql()") { - clear() - val testKey = "test.key.0" - val testVal = "test.val.0" - val nonexistentKey = "nonexistent" - - // "set" itself returns all config variables currently specified in SQLConf. - assert(sql("SET").collect().size == 0) - - // "set key=val" - sql(s"SET $testKey=$testVal") - checkAnswer( - sql("SET"), - Seq(Seq(testKey, testVal)) - ) - - sql(s"SET ${testKey + testKey}=${testVal + testVal}") - checkAnswer( - sql("set"), - Seq( - Seq(testKey, testVal), - Seq(testKey + testKey, testVal + testVal)) - ) - - // "set key" - checkAnswer( - sql(s"SET $testKey"), - Seq(Seq(testKey, testVal)) - ) - checkAnswer( - sql(s"SET $nonexistentKey"), - Seq(Seq(nonexistentKey, "")) - ) - clear() + TestSQLContext.settings.synchronized { + clear() + val testKey = "test.key.0" + val testVal = "test.val.0" + val nonexistentKey = "nonexistent" + + // "set" itself returns all config variables currently specified in SQLConf. + assert(sql("SET").collect().size == 0) + + // "set key=val" + sql(s"SET $testKey=$testVal") + checkAnswer( + sql("SET"), + Seq(Seq(testKey, testVal)) + ) + + sql(s"SET ${testKey + testKey}=${testVal + testVal}") + checkAnswer( + sql("set"), + Seq( + Seq(testKey, testVal), + Seq(testKey + testKey, testVal + testVal)) + ) + + // "set key" + checkAnswer( + sql(s"SET $testKey"), + Seq(Seq(testKey, testVal)) + ) + checkAnswer( + sql(s"SET $nonexistentKey"), + Seq(Seq(nonexistentKey, "")) + ) + clear() + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 4d7c84f443879..34d8a061ccc83 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -26,6 +26,9 @@ import scala.collection.JavaConversions._ * A set of test cases that validate partition and column pruning. */ class PruningSuite extends HiveComparisonTest { + // MINOR HACK: You must run a query before calling reset the first time. + TestHive.hql("SHOW TABLES") + // Column/partition pruning is not implemented for `InMemoryColumnarTableScan` yet, need to reset // the environment to ensure all referenced tables in this suites are not cached in-memory. // Refer to https://issues.apache.org/jira/browse/SPARK-2283 for details. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 4709a62381647..e05db236addca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -532,7 +532,10 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } /** @@ -540,7 +543,10 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) } /** @@ -548,7 +554,10 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - val cleanedF = context.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) cleanedF(rdds.head.asInstanceOf[RDD[T]], time) @@ -563,7 +572,10 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = ssc.sparkContext.clean(transformFunc, false) transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) } @@ -574,7 +586,10 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = ssc.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 2) val rdd1 = rdds(0).asInstanceOf[RDD[T]] diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1cc9c33cd2d02..438737f7a6b60 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils +import org.apache.spark.util.{SignalLogger, Utils} /** * An application master that runs the users driver program and allocates executors. @@ -409,7 +409,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } -object ApplicationMaster { +object ApplicationMaster extends Logging { // Number of times to wait for the allocator loop to complete. // Each loop iteration waits for 100ms, so maximum of 3 seconds. // This is to ensure that we have reasonable number of containers before we start @@ -487,6 +487,7 @@ object ApplicationMaster { } def main(argStrings: Array[String]) { + SignalLogger.register(log) val args = new ApplicationMasterArguments(argStrings) SparkHadoopUtil.get.runAsSparkUser { () => new ApplicationMaster(args).run() diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 6244332f23737..ee1e9c9c23d22 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils +import org.apache.spark.util.{SignalLogger, Utils} /** @@ -363,7 +363,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } -object ApplicationMaster { +object ApplicationMaster extends Logging { // Number of times to wait for the allocator loop to complete. // Each loop iteration waits for 100ms, so maximum of 3 seconds. // This is to ensure that we have reasonable number of containers before we start @@ -455,6 +455,7 @@ object ApplicationMaster { } def main(argStrings: Array[String]) { + SignalLogger.register(log) val args = new ApplicationMasterArguments(argStrings) SparkHadoopUtil.get.runAsSparkUser { () => new ApplicationMaster(args).run()