Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6869][PySpark] Add pyspark archives path to PYTHONPATH #5580

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
import org.apache.spark.util.{Utils, ChildFirstURLClassLoader, MutableURLClassLoader}

/**
* Whether to submit, kill, or request the status of an application.
Expand Down Expand Up @@ -328,6 +328,42 @@ object SparkSubmit {
}
}

// In yarn mode for a python app, add pyspark archives to files
// that can be distributed with the job
if (args.isPython && clusterManager == YARN) {
var pyArchives: String = null
if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) {
pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sys.env("PYSPARK_ARCHIVES_PATH")? Or even:

sys.env.get("PYSPARK_ARCHIVES_PATH").getOrElse(
  // code to figure out where the archives are
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have updated it using option, because in getOrElse(default),default must be a value, can not be expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that's not true, getOrElse takes a closure:

def getOrElse[B >: A](default: ⇒ B): B 

Which is why you can do getOrElse(throw SomeException()) (look for it in Spark's code base).

} else {
if (!sys.env.contains("SPARK_HOME")) {
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
}
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we just make sure the zip is built during the build then we don't need to do the zip in the code. Just require it already there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that is no effect. maybe sometime we upgrade just with coping spark.jar. so that is good for this situation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, if you just upgrade spark.jar then there are no change to the python scripts so you don't need to put new pyspark.zip. If there are changes then you either need to copy over the new python scripts or put a new pyspark.zip on there. It seems putting new pyspark.zip on there would be easier. Although I guess you need the python scripts there anyway for client mode so you probably need both.

In many cases I wouldn't expect a user to have write permissions on the python/lib directory. I would expect that to be a privileged operation. In that case the zip would fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i agree with you. thanks

val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator))
Utils.zipRecursive(pySrc, pyArchivesFile)
}
pythonPath += pyArchivesFile.getAbsolutePath
pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pyArchives = pythonPath.mkString(",")
}

pyArchives = pyArchives.split(",").map( localPath=> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style.

map { localPath =>
  // code
}

val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
(new Path(localPath)).getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: parentheses are unnecessary.

} else {
localURI.getPath.toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getPath already returns a String.

}
}).mkString(File.pathSeparator)
sysProps("spark.submit.pyArchives") = pyArchives
}

// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util

import java.io._
import java.util.zip.{ZipOutputStream, ZipEntry}
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
Expand Down Expand Up @@ -1000,6 +1001,40 @@ private[spark] object Utils extends Logging {
!fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())
}

/**
* recursively add files to the zip file
*/
def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = {
if (source.isDirectory()) {
output.putNextEntry(new ZipEntry(parent + source.getName()))
for (file <- source.listFiles()) {
addFilesToZip(parent + source.getName + File.separator, file, output)
}
} else {
val in = new FileInputStream(source)
output.putNextEntry(new ZipEntry(parent + source.getName()))
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
output.write(buf, 0, n)
}
}
in.close()
}
}

/**
* zip source file to dest ZipFile
*/
def zipRecursive(source: File, destZipFile: File) = {
val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
addFilesToZip("", source, destOutput)
destOutput.flush()
destOutput.close()
}

/**
* Determines if a directory contains any files newer than cutoff seconds.
*
Expand Down
12 changes: 10 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,20 @@ object PySparkAssembly {
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir
// list since that will copy unneeded / unwanted files.
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File =>
val src = new File(BuildCommons.sparkHome, "python/pyspark")

val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
IO.delete(zipFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zipFile.delete()? What's different about IO.delete?

def entries(f: File):List[File] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need equivalent changes in pom file for maven build?

f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil)
IO.zip(entries(src).map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use the usual style here?

.map { foo => code }

The indentation is also a little funny (zipFile looks like an argument to map when it's not).

d => (d, d.getAbsolutePath.substring(src.getParent.length +1))),
zipFile)

val dst = new File(outDir, "pyspark")
if (!dst.isDirectory()) {
require(dst.mkdirs())
}

val src = new File(BuildCommons.sparkHome, "python/pyspark")
copy(src, dst)
}
)
Expand Down
8 changes: 8 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@ private[spark] class Client(
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)

// if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed
// on to the ApplicationMaster and the executors.
if (sparkConf.contains("spark.submit.pyArchives")) {
val archives = sparkConf.get("spark.submit.pyArchives")
env("PYTHONPATH") = archives
sparkConf.setExecutorEnv("PYTHONPATH", archives)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this going to overwrite other things possibly on the path or does it end up appending?

}

// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
val amEnvPrefix = "spark.yarn.appMasterEnv."
sparkConf.getAll
Expand Down