Skip to content

Commit

Permalink
Merge remote-tracking branch 'asf/master' into spark-sink-test
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Aug 19, 2014
2 parents c9190d1 + cbfc26b commit 4df5be6
Show file tree
Hide file tree
Showing 82 changed files with 1,514 additions and 296 deletions.
32 changes: 0 additions & 32 deletions .travis.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.broadcast

import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
ObjectInputStream, ObjectOutputStream, OutputStream}
import java.io._

import scala.reflect.ClassTag
import scala.util.Random
Expand Down Expand Up @@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](

private val broadcastId = BroadcastBlockId(id)

TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

@transient private var arrayOfBlocks: Array[TorrentBlock] = null
@transient private var totalBlocks = -1
Expand Down Expand Up @@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
// Store meta-info
val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)

// Store individual pieces
for (i <- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}

Expand Down Expand Up @@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val metaId = BroadcastBlockId(id, "meta")
var attemptId = 10
while (attemptId > 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(metaId) match {
case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0

case None =>
Thread.sleep(500)
}
SparkEnv.get.blockManager.getSingle(metaId) match {
case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0

case None =>
Thread.sleep(500)
}
attemptId -= 1
}

if (totalBlocks == -1) {
return false
}
Expand All @@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
for (pid <- recvOrder) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)

case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
}

Expand Down Expand Up @@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ private[spark] class Master(
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
// Application UI is successfully rebuilt, so link the Master UI to it
app.desc.appUiUrl = ui.basePath
app.desc.appUiUrl = ui.getBasePath
true
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ private[spark] class ConnectionManager(

val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
if (!sendingConnectionOpt.isDefined) {
logError("Corresponding SendingConnectionManagerId not found")
logError(s"Corresponding SendingConnection to ${remoteConnectionManagerId} not found")
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private[spark] class EventLoggingListener(
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
.toLowerCase + "-" + System.currentTimeMillis
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")

protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private[spark] class SparkUI(
}
}

def getAppName = appName

/** Set the app name for this UI. */
def setAppName(name: String) {
appName = name
Expand All @@ -100,6 +102,13 @@ private[spark] class SparkUI(
private[spark] def appUIAddress = s"http://$appUIHostPort"
}

private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
extends WebUITab(parent, prefix) {

def appName: String = parent.getAppName

}

private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,15 @@ private[spark] object UIUtils extends Logging {

/** Returns a spark page with correctly formatted headers */
def headerSparkPage(
content: => Seq[Node],
basePath: String,
appName: String,
title: String,
tabs: Seq[WebUITab],
activeTab: WebUITab,
content: => Seq[Node],
activeTab: SparkUITab,
refreshInterval: Option[Int] = None): Seq[Node] = {

val header = tabs.map { tab =>
val appName = activeTab.appName
val header = activeTab.headerTabs.map { tab =>
<li class={if (tab == activeTab) "active" else ""}>
<a href={prependBaseUri(basePath, "/" + tab.prefix)}>{tab.name}</a>
<a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix)}>{tab.name}</a>
</li>
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[spark] abstract class WebUI(
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)

def getBasePath: String = basePath
def getTabs: Seq[WebUITab] = tabs.toSeq
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
def getSecurityManager: SecurityManager = securityManager
Expand Down Expand Up @@ -135,6 +136,8 @@ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {

/** Get a list of header tabs from the parent UI. */
def headerTabs: Seq[WebUITab] = parent.getTabs

def basePath: String = parent.getBasePath
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import scala.xml.Node
import org.apache.spark.ui.{UIUtils, WebUIPage}

private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand All @@ -45,7 +43,7 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("")
<h4>Classpath Entries</h4> {classpathEntriesTable}
</span>

UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent)
UIUtils.headerSparkPage("Environment", content, parent)
}

private def propertyHeader = Seq("Name", "Value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.ui._

private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") {
val appName = parent.appName
val basePath = parent.basePath
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
val listener = new EnvironmentListener

attachPage(new EnvironmentPage(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ private case class ExecutorSummaryInfo(
maxMemory: Long)

private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand Down Expand Up @@ -101,8 +99,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
</div>
</div>;

UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")",
parent.headerTabs, parent)
UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent)
}

/** Render an HTML row representing an executor */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import org.apache.spark.ExceptionFailure
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.{SparkUI, WebUITab}
import org.apache.spark.ui.{SparkUI, SparkUITab}

private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") {
val appName = parent.appName
val basePath = parent.basePath
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = new ExecutorsListener(parent.storageStatusListener)

attachPage(new ExecutorsPage(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}

/** Page showing list of all ongoing and recently finished stages and pools */
private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
private val listener = parent.listener
Expand Down Expand Up @@ -94,7 +92,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
<h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
failedStagesTable.toNodeSeq

UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", parent.headerTabs, parent)
UIUtils.headerSparkPage("Spark Stages", content, parent)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import javax.servlet.http.HttpServletRequest

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.{SparkUI, WebUITab}
import org.apache.spark.ui.{SparkUI, SparkUITab}

/** Web UI showing progress status of all jobs in the given SparkContext. */
private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") {
val appName = parent.appName
val basePath = parent.basePath
private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
val live = parent.live
val sc = parent.sc
val conf = if (live) sc.conf else new SparkConf
Expand All @@ -53,4 +51,5 @@ private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stag
Thread.sleep(100)
}
}

}
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}

/** Page showing specific pool details */
private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
private val listener = parent.listener
Expand All @@ -51,8 +49,7 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
<h4>Summary </h4> ++ poolTable.toNodeSeq ++
<h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq

UIUtils.headerSparkPage(content, basePath, appName, "Fair Scheduler Pool: " + poolName,
parent.headerTabs, parent)
UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent)
}
}
}
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.ui.UIUtils

/** Table showing list of pools */
private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
private val basePath = parent.basePath
private val listener = parent.listener

def toNodeSeq: Seq[Node] = {
Expand Down Expand Up @@ -59,11 +58,11 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
case Some(stages) => stages.size
case None => 0
}
val href = "%s/stages/pool?poolname=%s"
.format(UIUtils.prependBaseUri(parent.basePath), p.name)
<tr>
<td>
<a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(basePath), p.name)}>
{p.name}
</a>
<a href={href}>{p.name}</a>
</td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
Expand Down
Loading

0 comments on commit 4df5be6

Please sign in to comment.