Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3490] Disable SparkUI for tests (backport into 1.1) #2415

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging {
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new SparkUI(this))
} else {
// For tests, do not enable the UI
None
}
ui.foreach(_.bind())

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
Expand Down Expand Up @@ -1008,7 +1014,7 @@ class SparkContext(config: SparkConf) extends Logging {
/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.stop()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
conf.set(s"spark.$filterName.params", filterParams)
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ private[spark] class SimrSchedulerBackend(

val conf = new Configuration()
val fs = FileSystem.get(conf)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")

logInfo("Writing to HDFS file: " + driverFilePath)
logInfo("Writing Akka address: " + driverUrl)
logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
logInfo("Writing Spark UI Address: " + appUIAddress)

// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
temp.writeUTF(driverUrl)
temp.writeInt(maxCores)
temp.writeUTF(sc.ui.appUIAddress)
temp.writeUTF(appUIAddress)
temp.close()

// "Atomic" rename
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ private[spark] class SparkDeploySchedulerBackend(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val eventLogDir = sc.eventLogger.map(_.logDir)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
appUIAddress, eventLogDir)

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
Expand Down
44 changes: 31 additions & 13 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,25 @@ import scala.xml.Node

class UISuite extends FunSuite {

/**
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
*/
private def newSparkContext(): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set("spark.ui.enabled", "true")
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
}

ignore("basic ui visibility") {
withSpark(new SparkContext("local", "test")) { sc =>
withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress).mkString
val html = Source.fromURL(sc.ui.get.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase.contains("stages"))
assert(html.toLowerCase.contains("storage"))
Expand All @@ -51,7 +65,7 @@ class UISuite extends FunSuite {
}

ignore("visibility at localhost:4040") {
withSpark(new SparkContext("local", "test")) { sc =>
withSpark(newSparkContext()) { sc =>
// test if visible from http://localhost:4040
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL("http://localhost:4040").mkString
Expand All @@ -61,8 +75,8 @@ class UISuite extends FunSuite {
}

ignore("attaching a new tab") {
withSpark(new SparkContext("local", "test")) { sc =>
val sparkUI = sc.ui
withSpark(newSparkContext()) { sc =>
val sparkUI = sc.ui.get

val newTab = new WebUITab(sparkUI, "foo") {
attachPage(new WebUIPage("") {
Expand All @@ -73,7 +87,7 @@ class UISuite extends FunSuite {
}
sparkUI.attachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress).mkString
val html = Source.fromURL(sparkUI.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))

// check whether new page exists
Expand All @@ -87,7 +101,7 @@ class UISuite extends FunSuite {
}

eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
// check whether new page exists
assert(html.contains("magic"))
}
Expand Down Expand Up @@ -129,16 +143,20 @@ class UISuite extends FunSuite {
}

test("verify appUIAddress contains the scheme") {
withSpark(new SparkContext("local", "test")) { sc =>
val uiAddress = sc.ui.appUIAddress
assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
withSpark(newSparkContext()) { sc =>
val ui = sc.ui.get
val uiAddress = ui.appUIAddress
val uiHostPort = ui.appUIHostPort
assert(uiAddress.equals("http://" + uiHostPort))
}
}

test("verify appUIAddress contains the port") {
withSpark(new SparkContext("local", "test")) { sc =>
val splitUIAddress = sc.ui.appUIAddress.split(':')
assert(splitUIAddress(2).toInt == sc.ui.boundPort)
withSpark(newSparkContext()) { sc =>
val ui = sc.ui.get
val splitUIAddress = ui.appUIAddress.split(':')
val boundPort = ui.boundPort
assert(splitUIAddress(2).toInt == boundPort)
}
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@
<java.awt.headless>true</java.awt.headless>
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
<spark.testing>1</spark.testing>
<spark.ui.enabled>false</spark.ui.enabled>
</systemProperties>
</configuration>
<executions>
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ object TestSettings {
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dspark.ports.maxRetries=100",
javaOptions in Test += "-Dspark.ui.port=0",
javaOptions in Test += "-Dspark.ui.enabled=false",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
.map { case (k,v) => s"-D$k=$v" }.toSeq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.ui.StreamingTab
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.MetadataCleaner

/**
Expand Down Expand Up @@ -158,7 +158,14 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

private[streaming] val uiTab = new StreamingTab(this)
private[streaming] val progressListener = new StreamingJobProgressListener(this)

private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
} else {
None
}

/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
override val metricRegistry = new MetricRegistry
override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)

private val streamingListener = ssc.uiTab.listener
private val streamingListener = ssc.progressListener

private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,31 @@

package org.apache.spark.streaming.ui

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.ui.SparkUITab
import org.apache.spark.ui.{SparkUI, SparkUITab}

/** Spark Web UI tab that shows statistics of a streaming job */
import StreamingTab._

/**
* Spark Web UI tab that shows statistics of a streaming job.
* This assumes the given SparkContext has enabled its SparkUI.
*/
private[spark] class StreamingTab(ssc: StreamingContext)
extends SparkUITab(ssc.sc.ui, "streaming") with Logging {
extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {

val parent = ssc.sc.ui
val listener = new StreamingJobProgressListener(ssc)
val parent = getSparkUI(ssc)
val listener = ssc.progressListener

ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
}

private object StreamingTab {
def getSparkUI(ssc: StreamingContext): SparkUI = {
ssc.sc.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
}
}
}
16 changes: 12 additions & 4 deletions streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf

class UISuite extends FunSuite {

// Ignored: See SPARK-1530
ignore("streaming tab in spark UI") {
val ssc = new StreamingContext("local", "test", Seconds(1))
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set("spark.ui.enabled", "true")
val ssc = new StreamingContext(conf, Seconds(1))
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
val ui = ssc.sc.ui.get

eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
val html = Source.fromURL(ui.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
// test if streaming tab exist
assert(html.toLowerCase.contains("streaming"))
Expand All @@ -39,8 +48,7 @@ class UISuite extends FunSuite {
}

eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(
ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
assert(html.toLowerCase.contains("batch"))
assert(html.toLowerCase.contains("network"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
assert(sparkContext != null || count >= numTries)

if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIHostPort
uiAddress = sparkContext.ui.map(_.appUIHostPort).getOrElse("")
uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ private[spark] class YarnClientSchedulerBackend(
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
sc.ui.foreach { ui =>
conf.set("spark.driver.appUIAddress", ui.appUIHostPort)
conf.set("spark.driver.appUIHistoryAddress",
YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
}

val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
assert(sparkContext != null || numTries >= maxNumTries)

if (sparkContext != null) {
uiAddress = sparkContext.ui.appUIHostPort
uiAddress = sparkContext.ui.map(_.appUIHostPort).getOrElse("")
uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
Expand Down