-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6869][PySpark] Add pyspark archives path to PYTHONPATH #5580
Changes from 8 commits
f72987c
9f31dac
31e8e06
e0179be
0d2baf7
9396346
3b1e4c8
5192cca
f11f84a
e6b573b
4b8a3ed
e7bd971
9d87c3f
20402cd
150907b
f0b4ed8
008850a
1c8f664
c2ad0f9
66ffa43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -328,6 +328,46 @@ object SparkSubmit { | |
} | ||
} | ||
|
||
// In yarn mode for a python app, add pyspark archives to files | ||
// that can be distributed with the job | ||
if (args.isPython && clusterManager == YARN) { | ||
var pyArchives: String = null | ||
if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) { | ||
pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get | ||
} else { | ||
if (!sys.env.contains("SPARK_HOME")) { | ||
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") | ||
} | ||
val pythonPath = new ArrayBuffer[String] | ||
for (sparkHome <- sys.env.get("SPARK_HOME")) { | ||
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) | ||
val pyArchivesFile = new File(pyLibPath, "pyspark.zip") | ||
if (!pyArchivesFile.exists()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we just make sure the zip is built during the build then we don't need to do the zip in the code. Just require it already there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think that is no effect. maybe sometime we upgrade just with coping spark.jar. so that is good for this situation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I follow, if you just upgrade spark.jar then there are no change to the python scripts so you don't need to put new pyspark.zip. If there are changes then you either need to copy over the new python scripts or put a new pyspark.zip on there. It seems putting new pyspark.zip on there would be easier. Although I guess you need the python scripts there anyway for client mode so you probably need both. In many cases I wouldn't expect a user to have write permissions on the python/lib directory. I would expect that to be a privileged operation. In that case the zip would fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, i agree with you. thanks |
||
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") | ||
} | ||
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") | ||
if (!py4jFile.exists()) { | ||
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + | ||
"in yarn mode.") | ||
} | ||
pythonPath += Seq(pyLibPath, "pyspark.zip").mkString(File.separator) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Same below. |
||
pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator) | ||
} | ||
pyArchives = pythonPath.mkString(",") | ||
} | ||
|
||
pyArchives = pyArchives.split(",").map( localPath=> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style.
|
||
val localURI = Utils.resolveURI(localPath) | ||
if (localURI.getScheme != "local") { | ||
args.files = mergeFileLists(args.files, localURI.toString) | ||
(new Path(localPath)).getName | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: parentheses are unnecessary. |
||
} else { | ||
localURI.getPath.toString | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
}).mkString(File.pathSeparator) | ||
sysProps("spark.submit.pyArchives") = pyArchives | ||
} | ||
|
||
// If we're running a R app, set the main class to our specific R runner | ||
if (args.isR && deployMode == CLIENT) { | ||
if (args.primaryResource == SPARKR_SHELL) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -228,6 +228,7 @@ cp "$SPARK_HOME"/conf/*.template "$DISTDIR"/conf | |
cp "$SPARK_HOME/README.md" "$DISTDIR" | ||
cp -r "$SPARK_HOME/bin" "$DISTDIR" | ||
cp -r "$SPARK_HOME/python" "$DISTDIR" | ||
zip -r "$DISTDIR"/python/lib/pyspark.zip "$SPARK_HOME"/python/lib/pyspark | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no such thing as |
||
cp -r "$SPARK_HOME/sbin" "$DISTDIR" | ||
cp -r "$SPARK_HOME/ec2" "$DISTDIR" | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
package org.apache.spark.deploy.yarn | ||
|
||
import java.io.File | ||
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} | ||
import java.nio.ByteBuffer | ||
|
||
|
@@ -341,6 +342,17 @@ private[spark] class Client( | |
env("SPARK_YARN_USER_ENV") = userEnvs | ||
} | ||
|
||
// if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH | ||
// that can be passed on to the ApplicationMaster and the executors. | ||
if (sparkConf.contains("spark.submit.pyArchives")) { | ||
var pythonPath = sparkConf.get("spark.submit.pyArchives") | ||
if (env.contains("PYTHONPATH")) { | ||
pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator) | ||
} | ||
env("PYTHONPATH") = pythonPath | ||
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels there's something missing here. If the archives are something that is not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if archives is not local:, SparkSubmit have put archives to dist files. |
||
} | ||
|
||
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to | ||
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's | ||
// SparkContext will not let that set spark* system properties, which is expected behavior for | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sys.env("PYSPARK_ARCHIVES_PATH")
? Or even:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have updated it using option, because in getOrElse(default),default must be a value, can not be expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually that's not true,
getOrElse
takes a closure:Which is why you can do
getOrElse(throw SomeException())
(look for it in Spark's code base).