Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into additional-p…
Browse files Browse the repository at this point in the history
…arquet-filter-testcases
  • Loading branch information
sarutak committed Dec 1, 2014
2 parents 7550dcb + aea7a99 commit 3deb665
Show file tree
Hide file tree
Showing 97 changed files with 1,862 additions and 950 deletions.
2 changes: 2 additions & 0 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
# Enter posix mode for bash
set -o posix

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"

# Figure out where Spark is installed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,12 @@ $(function() {
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed');
});

$("input:checkbox:not(:checked)").each(function() {
var column = "table ." + $(this).attr("name");
$(column).hide();
});
// Stripe table rows after rows have been hidden to ensure correct striping.
stripeTables();
stripeSummaryTable();

$("input:checkbox").click(function() {
var column = "table ." + $(this).attr("name");
$(column).toggle();
stripeTables();
stripeSummaryTable();
});

$("#select-all-metrics").click(function() {
Expand Down
24 changes: 13 additions & 11 deletions core/src/main/resources/org/apache/spark/ui/static/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
* limitations under the License.
*/

/* Adds background colors to stripe table rows. This is necessary (instead of using css or the
* table striping provided by bootstrap) to appropriately stripe tables with hidden rows. */
function stripeTables() {
$("table.table-striped-custom").each(function() {
$(this).find("tr:not(:hidden)").each(function (index) {
if (index % 2 == 1) {
$(this).css("background-color", "#f9f9f9");
} else {
$(this).css("background-color", "#ffffff");
}
});
/* Adds background colors to stripe table rows in the summary table (on the stage page). This is
* necessary (instead of using css or the table striping provided by bootstrap) because the summary
* table has hidden rows.
*
* An ID selector (rather than a class selector) is used to ensure this runs quickly even on pages
* with thousands of task rows (ID selectors are much faster than class selectors). */
function stripeSummaryTable() {
$("#task-summary-table").find("tr:not(:hidden)").each(function (index) {
if (index % 2 == 1) {
$(this).css("background-color", "#f9f9f9");
} else {
$(this).css("background-color", "#ffffff");
}
});
}
6 changes: 6 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,9 @@ span.additional-metric-title {
border-left: 5px solid black;
display: inline-block;
}

/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .gc_time, .deserialization_time, .serialization_time, .getting_result_time {
display: none;
}
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

Expand Down Expand Up @@ -282,7 +284,7 @@ private object Accumulators {
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0

def newId: Long = synchronized {
def newId(): Long = synchronized {
lastId += 1
lastId
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value")
throw new NullPointerException("null value for " + key)
}
settings(key) = value
this
Expand Down
73 changes: 51 additions & 22 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.api.python

import java.io._
import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, UUID, Collections}

import org.apache.spark.input.PortableDataStream

Expand Down Expand Up @@ -47,7 +47,7 @@ private[spark] class PythonRDD(
pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
broadcastVars: JList[Broadcast[Array[Array[Byte]]]],
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {

Expand Down Expand Up @@ -230,8 +230,7 @@ private[spark] class PythonRDD(
if (!oldBids.contains(broadcast.id)) {
// send new broadcast
dataOut.writeLong(broadcast.id)
dataOut.writeLong(broadcast.value.map(_.length.toLong).sum)
broadcast.value.foreach(dataOut.write)
PythonRDD.writeUTF(broadcast.value.path, dataOut)
oldBids.add(broadcast.id)
}
}
Expand Down Expand Up @@ -368,24 +367,8 @@ private[spark] object PythonRDD extends Logging {
}
}

def readBroadcastFromFile(
sc: JavaSparkContext,
filename: String): Broadcast[Array[Array[Byte]]] = {
val size = new File(filename).length()
val file = new DataInputStream(new FileInputStream(filename))
val blockSize = 1 << 20
val n = ((size + blockSize - 1) / blockSize).toInt
val obj = new Array[Array[Byte]](n)
try {
for (i <- 0 until n) {
val length = if (i < (n - 1)) blockSize else (size % blockSize).toInt
obj(i) = new Array[Byte](length)
file.readFully(obj(i))
}
} finally {
file.close()
}
sc.broadcast(obj)
def readBroadcastFromFile(sc: JavaSparkContext, path: String): Broadcast[PythonBroadcast] = {
sc.broadcast(new PythonBroadcast(path))
}

def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
Expand Down Expand Up @@ -824,3 +807,49 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
}
}
}

/**
* An Wrapper for Python Broadcast, which is written into disk by Python. It also will
* write the data into disk after deserialization, then Python can read it from disks.
*/
private[spark] class PythonBroadcast(@transient var path: String) extends Serializable {

/**
* Read data from disks, then copy it to `out`
*/
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val in = new FileInputStream(new File(path))
try {
Utils.copyStream(in, out)
} finally {
in.close()
}
}

/**
* Write data into disk, using randomly generated name.
*/
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val dir = new File(Utils.getLocalDir(SparkEnv.get.conf))
val file = File.createTempFile("broadcast", "", dir)
path = file.getAbsolutePath
val out = new FileOutputStream(file)
try {
Utils.copyStream(in, out)
} finally {
out.close()
}
}

/**
* Delete the file once the object is GCed.
*/
override def finalize() {
if (!path.isEmpty) {
val file = new File(path)
if (file.exists()) {
file.delete()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private[deploy] object DeployMessages {

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription)
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ object SparkSubmit {
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case _ =>
}

Expand Down Expand Up @@ -345,6 +347,11 @@ object SparkSubmit {
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}

// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
Expand Down Expand Up @@ -388,6 +395,13 @@ object SparkSubmit {
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
}

/**
* Return whether the given main class represents a sql shell.
*/
private[spark] def isSqlShell(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
}

/**
* Return whether the given primary resource requires running python.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,27 @@ import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

/**
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
*/
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {

import FsHistoryProvider._

private val NOT_STARTED = "<Not Started>"

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000

private val logDir = conf.get("spark.history.fs.logDirectory", null)
private val resolvedLogDir = Option(logDir)
.map { d => Utils.resolveURI(d) }
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
private val logDir = conf.getOption("spark.history.fs.logDirectory")
.map { d => Utils.resolveURI(d).toString }
.getOrElse(DEFAULT_LOG_DIR)

private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
SparkHadoopUtil.get.newConfiguration(conf))
private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
Expand Down Expand Up @@ -87,14 +92,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

private def initialize() {
// Validate the log directory.
val path = new Path(resolvedLogDir)
val path = new Path(logDir)
if (!fs.exists(path)) {
throw new IllegalArgumentException(
"Logging directory specified does not exist: %s".format(resolvedLogDir))
var msg = s"Log directory specified does not exist: $logDir."
if (logDir == DEFAULT_LOG_DIR) {
msg += " Did you configure the correct one through spark.fs.history.logDirectory?"
}
throw new IllegalArgumentException(msg)
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(resolvedLogDir))
"Logging directory specified is not a directory: %s".format(logDir))
}

checkForLogs()
Expand Down Expand Up @@ -134,8 +142,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}

override def getConfig(): Map[String, String] =
Map("Event Log Location" -> resolvedLogDir.toString)
override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString)

/**
* Builds the application list based on the current contents of the log directory.
Expand All @@ -146,7 +153,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logStatus = fs.listStatus(new Path(logDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Load all new logs from the log directory. Only directories that have a modification time
Expand Down Expand Up @@ -244,6 +251,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

}

private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
}

private class FsApplicationHistoryInfo(
val logDir: String,
id: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
</h4> ++
appTable
} else {
<h4>No Completed Applications Found</h4>
<h4>No completed applications found!</h4> ++
<p>Did you specify the correct logging directory?
Please verify your setting of <span style="font-style:italic">
spark.history.fs.logDirectory</span> and whether you have the permissions to
access it.<br /> It is also possible that your application did not run to
completion or did not stop the SparkContext.
</p>
}
}
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@

package org.apache.spark.deploy.history

import org.apache.spark.SparkConf
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils

/**
* Command-line parser for the master.
*/
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
private var logDir: String = null
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) extends Logging {
private var propertiesFile: String = null

parse(args.toList)

private def parse(args: List[String]): Unit = {
args match {
case ("--dir" | "-d") :: value :: tail =>
logDir = value
logWarning("Setting log directory through the command line is deprecated as of " +
"Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead.")
conf.set("spark.history.fs.logDirectory", value)
System.setProperty("spark.history.fs.logDirectory", value)
parse(tail)
Expand Down Expand Up @@ -78,9 +78,10 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
| (default 50)
|FsHistoryProvider options:
|
| spark.history.fs.logDirectory Directory where app logs are stored (required)
| spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
| default 10)
| spark.history.fs.logDirectory Directory where app logs are stored
| (default: file:/tmp/spark-events)
| spark.history.fs.updateInterval How often to reload log data from storage
| (in seconds, default: 10)
|""".stripMargin)
System.exit(exitCode)
}
Expand Down
Loading

0 comments on commit 3deb665

Please sign in to comment.