Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6869][PySpark] Add pyspark archives path to PYTHONPATH #5580

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,46 @@ object SparkSubmit {
}
}

// In yarn mode for a python app, add pyspark archives to files
// that can be distributed with the job
if (args.isPython && clusterManager == YARN) {
var pyArchives: String = null
if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) {
pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sys.env("PYSPARK_ARCHIVES_PATH")? Or even:

sys.env.get("PYSPARK_ARCHIVES_PATH").getOrElse(
  // code to figure out where the archives are
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have updated it using option, because in getOrElse(default),default must be a value, can not be expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that's not true, getOrElse takes a closure:

def getOrElse[B >: A](default: ⇒ B): B 

Which is why you can do getOrElse(throw SomeException()) (look for it in Spark's code base).

} else {
if (!sys.env.contains("SPARK_HOME")) {
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
}
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we just make sure the zip is built during the build then we don't need to do the zip in the code. Just require it already there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that is no effect. maybe sometime we upgrade just with coping spark.jar. so that is good for this situation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, if you just upgrade spark.jar then there are no change to the python scripts so you don't need to put new pyspark.zip. If there are changes then you either need to copy over the new python scripts or put a new pyspark.zip on there. It seems putting new pyspark.zip on there would be easier. Although I guess you need the python scripts there anyway for client mode so you probably need both.

In many cases I wouldn't expect a user to have write permissions on the python/lib directory. I would expect that to be a privileged operation. In that case the zip would fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i agree with you. thanks

printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
}
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
if (!py4jFile.exists()) {
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
"in yarn mode.")
}
pythonPath += Seq(pyLibPath, "pyspark.zip").mkString(File.separator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pythonPath += pyArchivesFile.getAbsolutePath()?

Same below.

pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pyArchives = pythonPath.mkString(",")
}

pyArchives = pyArchives.split(",").map( localPath=> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style.

map { localPath =>
  // code
}

val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
(new Path(localPath)).getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: parentheses are unnecessary.

} else {
localURI.getPath.toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getPath already returns a String.

}
}).mkString(File.pathSeparator)
sysProps("spark.submit.pyArchives") = pyArchives
}

// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand Down
1 change: 1 addition & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ cp "$SPARK_HOME"/conf/*.template "$DISTDIR"/conf
cp "$SPARK_HOME/README.md" "$DISTDIR"
cp -r "$SPARK_HOME/bin" "$DISTDIR"
cp -r "$SPARK_HOME/python" "$DISTDIR"
zip -r "$DISTDIR"/python/lib/pyspark.zip "$SPARK_HOME"/python/lib/pyspark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no such thing as "$SPARK_HOME"/python/lib/pyspark.

cp -r "$SPARK_HOME/sbin" "$DISTDIR"
cp -r "$SPARK_HOME/ec2" "$DISTDIR"

Expand Down
12 changes: 12 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.File
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer

Expand Down Expand Up @@ -341,6 +342,17 @@ private[spark] class Client(
env("SPARK_YARN_USER_ENV") = userEnvs
}

// if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
// that can be passed on to the ApplicationMaster and the executors.
if (sparkConf.contains("spark.submit.pyArchives")) {
var pythonPath = sparkConf.get("spark.submit.pyArchives")
if (env.contains("PYTHONPATH")) {
pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
}
env("PYTHONPATH") = pythonPath
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels there's something missing here.

If the archives are something that is not local:, shouldn't they be uploaded to HDFS and distributed using the cache? Otherwise there's no guarantee that the files exist remotely, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if archives is not local:, SparkSubmit have put archives to dist files.
code is:args.files = mergeFileLists(args.files, localURI.toString)
so yarn's client can guarantee that dist files can distributed to nodes.

}

// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
Expand Down