Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6175] Fix standalone executor log links when ephemeral ports or SPARK_PUBLIC_DNS are used #4903

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LocalSparkCluster(
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum))
memoryPerWorker, masters, null, Some(workerNum), _conf)
workerActorSystems += workerSystem
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class Master(
val webUi = new MasterWebUI(this, webUiPort)

val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val webUiPort: Int,
val publicAddress: String,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
Expand Down Expand Up @@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private[spark] class Worker(
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)

val publicAddress = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just occurred to me that this might not be the right publicAddress to use: I think that we allow the --host flag to be passed when launching the worker, in which case this will default to the value of that flag, whereas the UI defaults to Utils.localHostname(), so there's still a slight chance for a discrepancy here. Maybe I can grab the bound host out of the web UI instance instead, just to be safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine as-is, though, since this publicAddress is used to construct the worker web UI links in WorkerInfo.scala.

val envVar = System.getenv("SPARK_PUBLIC_DNS")
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
var webUi: WorkerWebUI = null
Expand Down Expand Up @@ -362,7 +362,8 @@ private[spark] class Worker(
self,
workerId,
host,
webUiPort,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
akkaUrl,
Expand Down Expand Up @@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor with a custom conf is only called from LocalSparkCluster. We need to use the supplied SparkConf here so that the environment variable mocking works correctly.


// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)

def getBasePath: String = basePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,69 @@

package org.apache.spark.deploy

import java.net.URL

import scala.collection.mutable
import scala.io.Source

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.FunSuite

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{SparkContext, LocalSparkContext}
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}

class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
private val WAIT_TIMEOUT_MILLIS = 10000

before {
test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,512]", "test")

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
info.logUrlMap.foreach { case (logType, logUrl) =>
val html = Source.fromURL(logUrl).mkString
assert(html.contains(s"$logType log page"))
}
}
}

test("verify log urls get propagated from workers") {
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
val SPARK_PUBLIC_DNS = "public_dns"
class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(getAll)
}
}
val conf = new MySparkConf()
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { logUrl =>
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
"publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)
Expand Down