Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
Sephiroth-Lin committed Apr 22, 2015
1 parent c63f31f commit d012cde
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

val pythonPath = PythonUtils.mergePythonPaths(
PythonUtils.sparkPythonPath,
envVars.getOrElse("PYTHONPATH", sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")),
envVars.getOrElse("PYTHONPATH", ""),
sys.env.getOrElse("PYTHONPATH", ""))

def create(): Socket = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ object PythonRunner {
pathElements ++= formattedPyFiles
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
pathElements += sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)

// Launch Python process
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,21 @@ object SparkSubmit {
}
}

if (args.isPython && clusterManager == YARN) {
// Zip PySpark from ${SPARK_HOME}/python/pyspark to ${SPARK_HOME}/lib/pyspark.zip
// and ship to executors by Yarn.
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val srcFile = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator))
val archives = new File(Seq(sparkHome, "lib", "pyspark.zip").mkString(File.separator))
if (archives.exists() || Utils.createZipArchives(archives, srcFile, "pyspark")) {
val py4jPath = Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip")
.mkString(File.separator)
args.files = mergeFileLists(args.files, Utils.resolveURIs(archives.getAbsolutePath),
py4jPath)
}
}
}

// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.zip.{ZipEntry, ZipOutputStream}
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
import javax.net.ssl.HttpsURLConnection
Expand Down Expand Up @@ -2106,6 +2107,56 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}

/**
* Create zip archives.
*/
def createZipArchives(archives: File, srcFile: File, rootPath: String): Boolean = {
var flag = false
try {
val fileOutStream = new FileOutputStream(archives)
val buffOutStream = new BufferedOutputStream(fileOutStream)
val zipOutStream = new ZipOutputStream(buffOutStream)
flag = doZip(zipOutStream, rootPath, srcFile)
zipOutStream.close()
buffOutStream.close()
fileOutStream.close()

} catch {
case e: FileNotFoundException => logError("File to zip not found")
}
flag
}

private def doZip(zipOutStream: ZipOutputStream, curPath: String, file: File): Boolean = {
var flag = false
if (file.isDirectory) {
val files = file.listFiles()
if (files != null && files.length > 0) {
zipOutStream.putNextEntry(new ZipEntry(curPath + File.separator))
val nextPath = if (curPath.length == 0) "" else curPath + File.separator
for (subFile <- files) {
flag = doZip(zipOutStream, nextPath + subFile.getName, subFile)
}
}
} else {
zipOutStream.putNextEntry(new ZipEntry(curPath))
val fileInStream = new FileInputStream(file)
val buffInStream = new BufferedInputStream(fileInStream)
val bufSize = 8192
val buf = new Array[Byte](bufSize)
var len: Int = buffInStream.read(buf, 0, bufSize)
while (len != -1) {
zipOutStream.write(buf, 0, len)
len = buffInStream.read(buf, 0, bufSize)
}
zipOutStream.flush()
flag = true
buffInStream.close()
fileInStream.close()
}
flag
}

}

/**
Expand Down
40 changes: 10 additions & 30 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ private[spark] class Client(
List(
(SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
(APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
(PYSPARK_ARCHIVES, pysparkArchives(sparkConf), CONF_PYSPARK_ARCHIVES),
("log4j.properties", oldLog4jConf.orNull, null)
).foreach { case (destName, _localPath, confKey) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
Expand Down Expand Up @@ -381,19 +380,26 @@ private[spark] class Client(
* This sets up the launch environment, java options, and the command for launching the AM.
*/
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")

val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
val launchEnv = setupLaunchEnv(appStagingDir)

// From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are
// package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this
// path to PYTHONPATH.
for ((resPath, res) <- localResources if resPath.contains(PYSPARK_ARCHIVES)) {
launchEnv("PYSPARK_ARCHIVES_PATH") = resPath
var pysparkArchives = new ArrayBuffer[String]()
for ((resLink, res) <- localResources) {
if (resLink.contains("pyspark") || resLink.contains("py4j")) {
pysparkArchives.+=(resLink)
}
}
launchEnv("PYTHONPATH") = pysparkArchives.toArray.mkString(File.pathSeparator)
sparkConf.setExecutorEnv("PYTHONPATH", pysparkArchives.toArray.mkString(File.pathSeparator))

val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
Expand Down Expand Up @@ -692,7 +698,6 @@ object Client extends Logging {
// Alias for the Spark assembly jar, the user jar and PySpark archives
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
val PYSPARK_ARCHIVES: String = "__pyspark__.zip"

// URI scheme that identifies local resources
val LOCAL_SCHEME = "local"
Expand All @@ -704,9 +709,6 @@ object Client extends Logging {
val CONF_SPARK_JAR = "spark.yarn.jar"
val ENV_SPARK_JAR = "SPARK_JAR"

// Location of any user-defined PySpark archives
val CONF_PYSPARK_ARCHIVES = "spark.pyspark.archives"

// Internal config to propagate the location of the user's jar to the driver/executors
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"

Expand Down Expand Up @@ -745,28 +747,6 @@ object Client extends Logging {
}
}

/**
* Find the user-defined PySpark archives if configured, or return default.
* The default pyspark.zip is in the same path with assembly jar.
*/
private def pysparkArchives(conf: SparkConf): String = {
if (conf.contains(CONF_PYSPARK_ARCHIVES)) {
conf.get(CONF_PYSPARK_ARCHIVES)
} else {
SparkContext.jarOfClass(this.getClass) match {
case Some(jarPath) =>
val path = new File(jarPath)
val archives = new File(path.getParent + File.separator + "pyspark.zip")
if (archives.exists()) {
archives.getAbsolutePath
} else {
""
}
case None => ""
}
}
}

/**
* Return the path to the given application's staging directory.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,6 @@ class ExecutorRunnable(
}

System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }

// Add PySpark archives path
sys.env.get("PYSPARK_ARCHIVES_PATH") match {
case Some(pythonArchivesPath) => env("PYSPARK_ARCHIVES_PATH") = pythonArchivesPath
case None =>
}
env
}
}

0 comments on commit d012cde

Please sign in to comment.