Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1786: Edge Partition Serialization #1

Open
wants to merge 18 commits into
base: unify-rdds
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ while (($#)); do
shift
done

if [ ! -z $DRIVER_MEMORY ] && [ ! -z $DEPLOY_MODE ] && [ $DEPLOY_MODE = "client" ]; then
export SPARK_MEM=$DRIVER_MEMORY
DEPLOY_MODE=${DEPLOY_MODE:-"client"}

if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
fi

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
Expand Down
3 changes: 2 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<classifier>${mesos.classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down Expand Up @@ -322,7 +323,7 @@
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

/**
* Classes that represent cleaning tasks.
Expand Down Expand Up @@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning() {
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand All @@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}
} catch {
case t: Throwable => logError("Error in cleaning thread", t)
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
Expand All @@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.rddCleaned(rddId))
logInfo("Cleaned RDD " + rddId)
} catch {
case t: Throwable => logError("Error cleaning RDD " + rddId, t)
case e: Exception => logError("Error cleaning RDD " + rddId, e)
}
}

Expand All @@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
}
}

Expand All @@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.broadcastCleaned(broadcastId))
logInfo("Cleaned broadcast " + broadcastId)
} catch {
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging {
if (SparkHadoopUtil.get.isYarnMode() &&
(master == "yarn-standalone" || master == "yarn-cluster")) {
// In order for this to work in yarn-cluster mode the user must specify the
// --addjars option to the client to upload the file into the distributed cache
// --addJars option to the client to upload the file into the distributed cache
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
try {
Expand Down Expand Up @@ -1494,8 +1494,8 @@ object SparkContext extends Logging {
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
Expand All @@ -1510,8 +1510,8 @@ object SparkContext extends Logging {
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand All @@ -1521,8 +1521,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ object SparkEnv extends Logging {
val jvmInformation = Seq(
("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
("Java Home", Properties.javaHome),
("Scala Version", Properties.versionString),
("Scala Home", Properties.scalaHome)
("Scala Version", Properties.versionString)
).sorted

// Spark properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag](
val obj = new Array[Byte](length)
stream.readFully(obj)
obj
case 0 => Array.empty[Byte]
case SpecialLengths.TIMING_DATA =>
// Timing data from worker
val bootTime = stream.readLong()
Expand Down Expand Up @@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag](
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
Array.empty[Byte]
null
}
} catch {

Expand All @@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag](

var _nextObj = read()

def hasNext = _nextObj.length != 0
def hasNext = _nextObj != null
}
new InterruptibleIterator(context, stdoutIterator)
}
Expand All @@ -170,7 +171,7 @@ private[spark] class PythonRDD[T: ClassTag](
this.interrupt()
}

override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
SparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
Expand Down Expand Up @@ -281,7 +282,6 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
case e: Throwable => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
case e: Throwable => throw e
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object Client {
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object SparkHadoopUtil {
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
Expand Down
39 changes: 23 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ object SparkSubmit {
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)

/**
* @return
* a tuple containing the arguments for the child, a list of classpath
* @return a tuple containing the arguments for the child, a list of classpath
* entries for the child, a list of system propertes, a list of env vars
* and the main class for the child
*/
Expand Down Expand Up @@ -115,13 +114,16 @@ object SparkSubmit {
val sysProps = new HashMap[String, String]()
var childMainClass = ""

val isPython = args.isPython
val isYarnCluster = clusterManager == YARN && deployOnCluster

if (clusterManager == MESOS && deployOnCluster) {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}

// If we're running a Python app, set the Java class to run to be our PythonRunner, add
// Python files to deployment list, and pass the main file and Python path to PythonRunner
if (args.isPython) {
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
Expand Down Expand Up @@ -161,35 +163,35 @@ object SparkSubmit {
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
)

// For client mode make any added jars immediately visible on the classpath
Expand All @@ -212,21 +214,22 @@ object SparkSubmit {
}
}

// For standalone mode, add the application jar automatically so the user doesn't have to
// call sc.addJar. TODO: Standalone mode in the cluster
if (clusterManager == STANDALONE) {
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}

// Standalone cluster specific configurations
if (deployOnCluster && clusterManager == STANDALONE) {
if (args.supervise) {
childArgs += "--supervise"
}

childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
Expand All @@ -243,16 +246,20 @@ object SparkSubmit {
}
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
}

(childArgs, childClasspath, sysProps, childMainClass)
}

private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
{
private def launch(
childArgs: ArrayBuffer[String],
childClasspath: ArrayBuffer[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean = false) {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class HistoryServer(
* TODO: Add a mechanism to update manually.
*/
private val logCheckingThread = new Thread {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
val now = System.currentTimeMillis
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
Expand Down Expand Up @@ -154,7 +154,7 @@ class HistoryServer(
numCompletedApplications = logInfos.size

} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
case e: Exception => logError("Exception in checking for event log updates", e)
}
} else {
logWarning("Attempted to check for event log updates before binding the server.")
Expand Down Expand Up @@ -231,8 +231,8 @@ class HistoryServer(
dir.getModificationTime
}
} catch {
case t: Throwable =>
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
case e: Exception =>
logError("Exception in accessing modification time of %s".format(dir.getPath), e)
-1L
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,8 @@ private[spark] class Master(
webUi.attachSparkUI(ui)
return true
} catch {
case t: Throwable =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
case e: Exception =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
}
} else {
logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object DriverWrapper {
case workerUrl :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")

// Delegate to supplied main class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a
// SparkEnv / Executor before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
Loading