Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-3454_w_jersey
Browse files Browse the repository at this point in the history
Conflicts:
	core/pom.xml
	core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
	core/src/main/scala/org/apache/spark/deploy/master/Master.scala
	core/src/main/scala/org/apache/spark/ui/WebUI.scala
  • Loading branch information
squito committed Mar 17, 2015
2 parents 56d2fc7 + 4cca391 commit e91750a
Show file tree
Hide file tree
Showing 94 changed files with 1,154 additions and 557 deletions.
1 change: 0 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYSPARK_SUBMIT_ARGS=pyspark-shell
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,asm
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security,asm
</includeArtifactIds>
<silent>true</silent>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.StatCounter
import org.apache.spark.util.Utils

class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD] {
class JavaDoubleRDD(val srdd: RDD[scala.Double])
extends AbstractJavaRDDLike[JDouble, JavaDoubleRDD] {

override val classTag: ClassTag[JDouble] = implicitly[ClassTag[JDouble]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils

class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
(implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] {

override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
extends JavaRDDLike[T, JavaRDD[T]] {
extends AbstractJavaRDDLike[T, JavaRDD[T]] {

override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations
* of JavaRDDLike should extend this dummy abstract class instead of directly inheriting
* from the trait. See SPARK-3266 for additional details.
*/
private[spark] abstract class AbstractJavaRDDLike[T, This <: JavaRDDLike[T, This]]
extends JavaRDDLike[T, This]

/**
* Defines operations common to several Java RDD implementations.
* Note that this trait is not intended to be implemented by user code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
/**
* Command-line parser for the driver client.
*/
private[spark] class ClientArguments(args: Array[String]) {
private[deploy] class ClientArguments(args: Array[String]) {
import ClientArguments._

var cmd: String = "" // 'launch' or 'kill'
Expand Down Expand Up @@ -96,7 +96,7 @@ private[spark] class ClientArguments(args: Array[String]) {
/**
* Print usage and exit JVM with the given exit code.
*/
def printUsageAndExit(exitCode: Int) {
private def printUsageAndExit(exitCode: Int) {
// TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
// separately similar to in the YARN client.
val usage =
Expand All @@ -116,10 +116,10 @@ private[spark] class ClientArguments(args: Array[String]) {
}
}

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_SUPERVISE = false
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1
val DEFAULT_MEMORY = 512 // MB
val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] class DriverDescription(
private[deploy] class DriverDescription(
val jarUrl: String,
val mem: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.deploy
* This state is sufficient for the Master to reconstruct its internal data structures during
* failover.
*/
private[spark] class ExecutorDescription(
private[deploy] class ExecutorDescription(
val appId: String,
val execId: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {
private[deploy] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
* - The docker images tagged spark-test-master and spark-test-worker are built from the
* docker/ directory. Run 'docker/spark-test/build' to generate these.
*/
private[spark] object FaultToleranceTest extends App with Logging {
private object FaultToleranceTest extends App with Logging {

val conf = new SparkConf()
val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
private val conf = new SparkConf()
private val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")

val masters = ListBuffer[TestMasterInfo]()
val workers = ListBuffer[TestWorkerInfo]()
var sc: SparkContext = _
private val masters = ListBuffer[TestMasterInfo]()
private val workers = ListBuffer[TestWorkerInfo]()
private var sc: SparkContext = _

val zk = SparkCuratorUtil.newClient(conf)
private val zk = SparkCuratorUtil.newClient(conf)

var numPassed = 0
var numFailed = 0
private var numPassed = 0
private var numFailed = 0

val sparkHome = System.getenv("SPARK_HOME")
private val sparkHome = System.getenv("SPARK_HOME")
assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")

val containerSparkHome = "/opt/spark"
val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
private val containerSparkHome = "/opt/spark"
private val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)

System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip

def afterEach() {
private def afterEach() {
if (sc != null) {
sc.stop()
sc = null
Expand Down Expand Up @@ -179,7 +179,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
}

def test(name: String)(fn: => Unit) {
private def test(name: String)(fn: => Unit) {
try {
fn
numPassed += 1
Expand All @@ -197,19 +197,19 @@ private[spark] object FaultToleranceTest extends App with Logging {
afterEach()
}

def addMasters(num: Int) {
private def addMasters(num: Int) {
logInfo(s">>>>> ADD MASTERS $num <<<<<")
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
}

def addWorkers(num: Int) {
private def addWorkers(num: Int) {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}

/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
private def createClient() = {
logInfo(">>>>> CREATE CLIENT <<<<<")
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
Expand All @@ -218,27 +218,27 @@ private[spark] object FaultToleranceTest extends App with Logging {
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
}

def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
private def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
"spark://" + masters.map(master => master.ip + ":7077").mkString(",")
}

def getLeader: TestMasterInfo = {
private def getLeader: TestMasterInfo = {
val leaders = masters.filter(_.state == RecoveryState.ALIVE)
assertTrue(leaders.size == 1)
leaders(0)
}

def killLeader(): Unit = {
private def killLeader(): Unit = {
logInfo(">>>>> KILL LEADER <<<<<")
masters.foreach(_.readState())
val leader = getLeader
masters -= leader
leader.kill()
}

def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
private def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)

def terminateCluster() {
private def terminateCluster() {
logInfo(">>>>> TERMINATE CLUSTER <<<<<")
masters.foreach(_.kill())
workers.foreach(_.kill())
Expand All @@ -247,7 +247,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}

/** This includes Client retry logic, so it may take a while if the cluster is recovering. */
def assertUsable() = {
private def assertUsable() = {
val f = future {
try {
val res = sc.parallelize(0 until 10).collect()
Expand All @@ -269,7 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
* Asserts that the cluster is usable and that the expected masters and workers
* are all alive in a proper configuration (e.g., only one leader).
*/
def assertValidClusterState() = {
private def assertValidClusterState() = {
logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
assertUsable()
var numAlive = 0
Expand Down Expand Up @@ -325,7 +325,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
}

def assertTrue(bool: Boolean, message: String = "") {
private def assertTrue(bool: Boolean, message: String = "") {
if (!bool) {
throw new IllegalStateException("Assertion failed: " + message)
}
Expand All @@ -335,7 +335,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
numFailed))
}

private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
Expand Down Expand Up @@ -377,7 +377,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
format(ip, dockerId.id, logFile.getAbsolutePath, state)
}

private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
Expand All @@ -390,7 +390,7 @@ private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val
"[ip=%s, id=%s, logFile=%s]".format(ip, dockerId, logFile.getAbsolutePath)
}

private[spark] object SparkDocker {
private object SparkDocker {
def startMaster(mountDir: String): TestMasterInfo = {
val cmd = Docker.makeRunCmd("spark-test-master", mountDir = mountDir)
val (ip, id, outFile) = startNode(cmd)
Expand Down Expand Up @@ -425,11 +425,11 @@ private[spark] object SparkDocker {
}
}

private[spark] class DockerId(val id: String) {
private class DockerId(val id: String) {
override def toString = id
}

private[spark] object Docker extends Logging {
private object Docker extends Logging {
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""

Expand Down
26 changes: 26 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,32 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner

private[spark] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo) = {
("id" -> obj.id) ~
("host" -> obj.host) ~
("port" -> obj.port) ~
("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("coresfree" -> obj.coresFree) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("memoryfree" -> obj.memoryFree) ~
("state" -> obj.state.toString) ~
("lastheartbeat" -> obj.lastHeartbeat)
}

def writeApplicationInfo(obj: ApplicationInfo) = {
("starttime" -> obj.startTime) ~
("id" -> obj.id) ~
("name" -> obj.desc.name) ~
("cores" -> obj.desc.maxCores) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerSlave) ~
("submitdate" -> obj.submitDate.toString) ~
("state" -> obj.state.toString) ~
("duration" -> obj.duration)
}

def writeApplicationDescription(obj: ApplicationDescription) = {
("name" -> obj.name) ~
Expand Down
Loading

0 comments on commit e91750a

Please sign in to comment.