-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6869][PySpark] Add pyspark archives path to PYTHONPATH #5580
Changes from 5 commits
f72987c
9f31dac
31e8e06
e0179be
0d2baf7
9396346
3b1e4c8
5192cca
f11f84a
e6b573b
4b8a3ed
e7bd971
9d87c3f
20402cd
150907b
f0b4ed8
008850a
1c8f664
c2ad0f9
66ffa43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} | |
|
||
import org.apache.spark.SPARK_VERSION | ||
import org.apache.spark.deploy.rest._ | ||
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} | ||
import org.apache.spark.util.{Utils, ChildFirstURLClassLoader, MutableURLClassLoader} | ||
|
||
/** | ||
* Whether to submit, kill, or request the status of an application. | ||
|
@@ -328,6 +328,42 @@ object SparkSubmit { | |
} | ||
} | ||
|
||
// In yarn mode for a python app, add pyspark archives to files | ||
// that can be distributed with the job | ||
if (args.isPython && clusterManager == YARN) { | ||
var pyArchives: String = null | ||
if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) { | ||
pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get | ||
} else { | ||
if (!sys.env.contains("SPARK_HOME")) { | ||
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") | ||
} | ||
val pythonPath = new ArrayBuffer[String] | ||
for (sparkHome <- sys.env.get("SPARK_HOME")) { | ||
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) | ||
val pyArchivesFile = new File(pyLibPath, "pyspark.zip") | ||
if (!pyArchivesFile.exists()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we just make sure the zip is built during the build then we don't need to do the zip in the code. Just require it already there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think that is no effect. maybe sometime we upgrade just with coping spark.jar. so that is good for this situation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I follow, if you just upgrade spark.jar then there are no change to the python scripts so you don't need to put new pyspark.zip. If there are changes then you either need to copy over the new python scripts or put a new pyspark.zip on there. It seems putting new pyspark.zip on there would be easier. Although I guess you need the python scripts there anyway for client mode so you probably need both. In many cases I wouldn't expect a user to have write permissions on the python/lib directory. I would expect that to be a privileged operation. In that case the zip would fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, i agree with you. thanks |
||
val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) | ||
Utils.zipRecursive(pySrc, pyArchivesFile) | ||
} | ||
pythonPath += pyArchivesFile.getAbsolutePath | ||
pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator) | ||
} | ||
pyArchives = pythonPath.mkString(",") | ||
} | ||
|
||
pyArchives = pyArchives.split(",").map( localPath=> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style.
|
||
val localURI = Utils.resolveURI(localPath) | ||
if (localURI.getScheme != "local") { | ||
args.files = mergeFileLists(args.files, localURI.toString) | ||
(new Path(localPath)).getName | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: parentheses are unnecessary. |
||
} else { | ||
localURI.getPath.toString | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
}).mkString(File.pathSeparator) | ||
sysProps("spark.submit.pyArchives") = pyArchives | ||
} | ||
|
||
// If we're running a R app, set the main class to our specific R runner | ||
if (args.isR && deployMode == CLIENT) { | ||
if (args.primaryResource == SPARKR_SHELL) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -361,12 +361,20 @@ object PySparkAssembly { | |
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir | ||
// list since that will copy unneeded / unwanted files. | ||
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File => | ||
val src = new File(BuildCommons.sparkHome, "python/pyspark") | ||
|
||
val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") | ||
IO.delete(zipFile) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
def entries(f: File):List[File] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you need equivalent changes in pom file for maven build? |
||
f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil) | ||
IO.zip(entries(src).map( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you use the usual style here?
The indentation is also a little funny ( |
||
d => (d, d.getAbsolutePath.substring(src.getParent.length +1))), | ||
zipFile) | ||
|
||
val dst = new File(outDir, "pyspark") | ||
if (!dst.isDirectory()) { | ||
require(dst.mkdirs()) | ||
} | ||
|
||
val src = new File(BuildCommons.sparkHome, "python/pyspark") | ||
copy(src, dst) | ||
} | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -326,6 +326,14 @@ private[spark] class Client( | |
distCacheMgr.setDistFilesEnv(env) | ||
distCacheMgr.setDistArchivesEnv(env) | ||
|
||
// if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed | ||
// on to the ApplicationMaster and the executors. | ||
if (sparkConf.contains("spark.submit.pyArchives")) { | ||
val archives = sparkConf.get("spark.submit.pyArchives") | ||
env("PYTHONPATH") = archives | ||
sparkConf.setExecutorEnv("PYTHONPATH", archives) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this going to overwrite other things possibly on the path or does it end up appending? |
||
} | ||
|
||
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* | ||
val amEnvPrefix = "spark.yarn.appMasterEnv." | ||
sparkConf.getAll | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sys.env("PYSPARK_ARCHIVES_PATH")
? Or even:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have updated it using option, because in getOrElse(default),default must be a value, can not be expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually that's not true,
getOrElse
takes a closure:Which is why you can do
getOrElse(throw SomeException())
(look for it in Spark's code base).