Skip to content

Commit

Permalink
Add failing unit tests for standalone log URL viewing
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Mar 5, 2015
1 parent c250fbe commit 27918c7
Showing 1 changed file with 45 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,70 @@

package org.apache.spark.deploy

import java.net.URL

import scala.collection.mutable
import scala.io.Source

import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.FunSuite

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{SparkContext, LocalSparkContext}
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}

class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
private val WAIT_TIMEOUT_MILLIS = 10000

before {
test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,512]", "test")

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
info.logUrlMap.foreach { case (logType, logUrl) =>
println(logUrl)
val html = Source.fromURL(logUrl).mkString
assert(html.contains(s"$logType log page"))
}
}
}

test("verify log urls get propagated from workers") {
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
val SPARK_PUBLIC_DNS = "public_dns"
class MySparkConf extends SparkConf(false) {
override def getenv(name: String) = {
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
else super.getenv(name)
}

override def clone: SparkConf = {
new MySparkConf().setAll(getAll)
}
}
val conf = new MySparkConf()
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)

val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { logUrl =>
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
}
}
}

Expand Down

0 comments on commit 27918c7

Please sign in to comment.