Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6869][PySpark] Add pyspark archives path to PYTHONPATH #5580

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@
<skip>true</skip>
</configuration>
</plugin>
<!-- zip pyspark archives to run python application on yarn mode -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
<configuration>
<target>
<delete dir="${basedir}/../python/lib/pyspark.zip"/>
<zip destfile="${basedir}/../python/lib/pyspark.zip">
<fileset dir="${basedir}/../python/" includes="pyspark/**/*"/>
</zip>
</target>
</configuration>
</plugin>
<!-- Use the shade plugin to create a big JAR with all the dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,47 @@ object SparkSubmit {
}
}

// In yarn mode for a python app, add pyspark archives to files
// that can be distributed with the job
if (args.isPython && clusterManager == YARN) {
var pyArchives: String = null
val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
if (pyArchivesEnvOpt.isDefined) {
pyArchives = pyArchivesEnvOpt.get
} else {
if (!sys.env.contains("SPARK_HOME")) {
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
}
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we just make sure the zip is built during the build then we don't need to do the zip in the code. Just require it already there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that is no effect. maybe sometime we upgrade just with coping spark.jar. so that is good for this situation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, if you just upgrade spark.jar then there are no change to the python scripts so you don't need to put new pyspark.zip. If there are changes then you either need to copy over the new python scripts or put a new pyspark.zip on there. It seems putting new pyspark.zip on there would be easier. Although I guess you need the python scripts there anyway for client mode so you probably need both.

In many cases I wouldn't expect a user to have write permissions on the python/lib directory. I would expect that to be a privileged operation. In that case the zip would fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i agree with you. thanks

printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
}
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
if (!py4jFile.exists()) {
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
"in yarn mode.")
}
pythonPath += pyArchivesFile.getAbsolutePath()
pythonPath += py4jFile.getAbsolutePath()
}
pyArchives = pythonPath.mkString(",")
}

pyArchives = pyArchives.split(",").map { localPath=>
val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
new Path(localPath).getName
} else {
localURI.getPath
}
}.mkString(File.pathSeparator)
sysProps("spark.submit.pyArchives") = pyArchives
}

// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand Down
37 changes: 35 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,23 +363,56 @@ object Assembly {
object PySparkAssembly {
import sbtassembly.Plugin._
import AssemblyKeys._
import java.util.zip.{ZipOutputStream, ZipEntry}

lazy val settings = Seq(
unmanagedJars in Compile += { BuildCommons.sparkHome / "python/lib/py4j-0.8.2.1-src.zip" },
// Use a resource generator to copy all .py files from python/pyspark into a managed directory
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir
// list since that will copy unneeded / unwanted files.
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File =>
val src = new File(BuildCommons.sparkHome, "python/pyspark")

val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
zipFile.delete()
zipRecursive(src, zipFile)

val dst = new File(outDir, "pyspark")
if (!dst.isDirectory()) {
require(dst.mkdirs())
}

val src = new File(BuildCommons.sparkHome, "python/pyspark")
copy(src, dst)
}
)

private def zipRecursive(source: File, destZipFile: File) = {
val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
addFilesToZipStream("", source, destOutput)
destOutput.flush()
destOutput.close()
}

private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = {
if (source.isDirectory()) {
output.putNextEntry(new ZipEntry(parent + source.getName()))
for (file <- source.listFiles()) {
addFilesToZipStream(parent + source.getName() + File.separator, file, output)
}
} else {
val in = new FileInputStream(source)
output.putNextEntry(new ZipEntry(parent + source.getName()))
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
output.write(buf, 0, n)
}
}
in.close()
}
}

private def copy(src: File, dst: File): Seq[File] = {
src.listFiles().flatMap { f =>
val child = new File(dst, f.getName())
Expand Down
23 changes: 17 additions & 6 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,17 @@ private[spark] class Client(
env("SPARK_YARN_USER_ENV") = userEnvs
}

// if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
// that can be passed on to the ApplicationMaster and the executors.
if (sparkConf.contains("spark.submit.pyArchives")) {
var pythonPath = sparkConf.get("spark.submit.pyArchives")
if (env.contains("PYTHONPATH")) {
pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
}
env("PYTHONPATH") = pythonPath
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels there's something missing here.

If the archives are something that is not local:, shouldn't they be uploaded to HDFS and distributed using the cache? Otherwise there's no guarantee that the files exist remotely, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if archives is not local:, SparkSubmit have put archives to dist files.
code is:args.files = mergeFileLists(args.files, localURI.toString)
so yarn's client can guarantee that dist files can distributed to nodes.

}

// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
Expand Down Expand Up @@ -1074,7 +1085,7 @@ object Client extends Logging {
val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")

val hiveConfGet = (param:String) => Option(hiveConfClass
val hiveConfGet = (param: String) => Option(hiveConfClass
.getMethod("get", classOf[java.lang.String])
.invoke(hiveConf, param))

Expand All @@ -1096,7 +1107,7 @@ object Client extends Logging {

val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token)
credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
logDebug("Added hive.Server2.delegation.token to conf.")
hiveClass.getMethod("closeCurrent").invoke(null)
} else {
Expand Down Expand Up @@ -1141,13 +1152,13 @@ object Client extends Logging {

logInfo("Added HBase security token to credentials.")
} catch {
case e:java.lang.NoSuchMethodException =>
case e: java.lang.NoSuchMethodException =>
logInfo("HBase Method not found: " + e)
case e:java.lang.ClassNotFoundException =>
case e: java.lang.ClassNotFoundException =>
logDebug("HBase Class not found: " + e)
case e:java.lang.NoClassDefFoundError =>
case e: java.lang.NoClassDefFoundError =>
logDebug("HBase Class not found: " + e)
case e:Exception =>
case e: Exception =>
logError("Exception when obtaining HBase security token: " + e)
}
}
Expand Down