Skip to content

Commit

Permalink
rebase as ExecutorRunnableUtil changed to ExecutorRunnable
Browse files Browse the repository at this point in the history
  • Loading branch information
WangTaoTheTonic committed Jan 9, 2015
2 parents 396c226 + 48cecf6 commit 29b751b
Show file tree
Hide file tree
Showing 208 changed files with 4,239 additions and 2,653 deletions.
20 changes: 20 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,5 +354,25 @@
</dependency>
</dependencies>
</profile>

<!-- Profiles that disable inclusion of certain dependencies. -->
<profile>
<id>hadoop-provided</id>
<properties>
<hadoop.deps.scope>provided</hadoop.deps.scope>
</properties>
</profile>
<profile>
<id>hive-provided</id>
<properties>
<hive.deps.scope>provided</hive.deps.scope>
</properties>
</profile>
<profile>
<id>parquet-provided</id>
<properties>
<parquet.deps.scope>provided</parquet.deps.scope>
</properties>
</profile>
</profiles>
</project>
15 changes: 0 additions & 15 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand All @@ -58,11 +49,5 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4 changes: 2 additions & 2 deletions bagel/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# limitations under the License.
#

# Set everything to be logged to the file bagel/target/unit-tests.log
# Set everything to be logged to the file target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
Expand Down
7 changes: 7 additions & 0 deletions bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
:no_yarn_conf_dir

rem To allow for distributions to append needed libraries to the classpath (e.g. when
rem using the "hadoop-provided" profile to build Spark), check SPARK_DIST_CLASSPATH and
rem append it to tbe final classpath.
if not "x%$SPARK_DIST_CLASSPATH%"=="x" (
set CLASSPATH=%CLASSPATH%;%SPARK_DIST_CLASSPATH%
)

rem A bit of a hack to allow calling this script within run2.cmd without seeing output
if "%DONT_PRINT_CLASSPATH%"=="1" goto exit

Expand Down
7 changes: 7 additions & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,11 @@ if [ -n "$YARN_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
fi

# To allow for distributions to append needed libraries to the classpath (e.g. when
# using the "hadoop-provided" profile to build Spark), check SPARK_DIST_CLASSPATH and
# append it to tbe final classpath.
if [ -n "$SPARK_DIST_CLASSPATH" ]; then
CLASSPATH="$CLASSPATH:$SPARK_DIST_CLASSPATH"
fi

echo "$CLASSPATH"
5 changes: 5 additions & 0 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ while (($#)); do
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_SUBMIT_OPTS=$2
elif [ "$1" = "--master" ]; then
export MASTER=$2
fi
shift
done

DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf"
if [ "$MASTER" == "yarn-cluster" ]; then
SPARK_SUBMIT_DEPLOY_MODE=cluster
fi
export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"}
export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}

Expand Down
6 changes: 6 additions & 0 deletions bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ if [%1] == [] goto continue
set SPARK_SUBMIT_CLASSPATH=%2
) else if [%1] == [--driver-java-options] (
set SPARK_SUBMIT_OPTS=%2
) else if [%1] == [--master] (
set MASTER=%2
)
shift
goto loop
:continue

if [%MASTER%] == [yarn-cluster] (
set SPARK_SUBMIT_DEPLOY_MODE=cluster
)

rem For client mode, the driver will be launched in the same JVM that launches
rem SparkSubmit, so we may need to read the properties file for any extra class
rem paths, library paths, java options and memory early on. Otherwise, it will
Expand Down
18 changes: 0 additions & 18 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,6 @@
<artifactId>selenium-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
Expand Down Expand Up @@ -326,19 +321,6 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus

conf.set("spark.executor.id", "driver")
conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
Expand Down Expand Up @@ -331,8 +331,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => throw
new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
case e: Exception => {
try {
stop()
} finally {
throw new SparkException("Error while constructing DAGScheduler", e)
}
}
}

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
Expand Down Expand Up @@ -1710,19 +1715,19 @@ object SparkContext extends Logging {

// Implicit conversions to common Writable types, for saveAsSequenceFile

implicit def intToIntWritable(i: Int) = new IntWritable(i)
implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i)

implicit def longToLongWritable(l: Long) = new LongWritable(l)
implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l)

implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f)

implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d)

implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b)

implicit def bytesToBytesWritable (aob: Array[Byte]) = new BytesWritable(aob)
implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob)

implicit def stringToText(s: String) = new Text(s)
implicit def stringToText(s: String): Text = new Text(s)

private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
: ArrayWritable = {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ object Client {
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
Master.toAkkaUrl(driverArgs.master)
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

actorSystem.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import java.net.URI
import java.util.jar.JarFile

import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand Down Expand Up @@ -125,14 +126,23 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && primaryResource != null) {
try {
val jar = new JarFile(primaryResource)
// Note that this might still return null if no main-class is set; we catch that later
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit("Cannot load main class from JAR: " + primaryResource)
return
val uri = new URI(primaryResource)
val uriScheme = uri.getScheme()

uriScheme match {
case "file" =>
try {
val jar = new JarFile(uri.getPath)
// Note that this might still return null if no main-class is set; we catch that later
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
}

Expand Down
22 changes: 9 additions & 13 deletions core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
Expand All @@ -47,6 +47,8 @@ private[spark] class AppClient(
conf: SparkConf)
extends Logging {

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)

val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3

Expand Down Expand Up @@ -75,9 +77,9 @@ private[spark] class AppClient(
}

def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterApplication(appDescription)
}
}
Expand All @@ -103,20 +105,14 @@ private[spark] class AppClient(
}

def changeMaster(url: String) {
// activeMasterUrl is a valid Spark url since we receive it from master.
activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(host, port) =>
Address("akka.tcp", Master.systemName, host, port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
masterAddress = Master.toAkkaAddress(activeMasterUrl)
}

private def isPossibleMaster(remoteUrl: Address) = {
masterUrls.map(s => Master.toAkkaUrl(s))
.map(u => AddressFromURIString(u).hostPort)
.contains(remoteUrl.hostPort)
masterAkkaUrls.map(AddressFromURIString(_).hostPort).contains(remoteUrl.hostPort)
}

override def receiveWithLogging = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
sparkUser: String,
completed: Boolean = false)

private[spark] abstract class ApplicationHistoryProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
val isFinishedApplication =
if (isLegacyLogDirectory(entry)) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}

if (isFinishedApplication) {
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
} else {
false
}
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
Expand All @@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
None
}
}
.sortBy { info => -info.endTime }
.sortBy { info => (-info.endTime, -info.startTime) }

lastModifiedTime = newLastModifiedTime

Expand Down Expand Up @@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog),
appListener.sparkUser.getOrElse(NOT_STARTED))
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
} finally {
logInput.close()
}
Expand Down Expand Up @@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)

/**
* Return true when the application has completed.
*/
private def isApplicationCompleted(entry: FileStatus): Boolean = {
if (isLegacyLogDirectory(entry)) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}
}

}

private object FsHistoryProvider {
Expand All @@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
sparkUser: String,
completed: Boolean = true)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
Loading

0 comments on commit 29b751b

Please sign in to comment.