-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1230: [WIP] Enable SparkContext.addJars() to load classes not in CLASSPATH #119
Changes from 13 commits
8854dab
ae1c199
5701bf4
d3df241
5c24b0b
cbabe80
b0cc61d
97a19b5
9757c6f
3e52b0c
178083d
b132d7b
95b24f2
fbcb4a0
9637d21
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
package org.apache.spark | ||
|
||
import java.io._ | ||
import java.net.URI | ||
import java.net.{URI, URL} | ||
import java.util.{Properties, UUID} | ||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
|
@@ -44,7 +44,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me | |
import org.apache.spark.scheduler.local.LocalBackend | ||
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} | ||
import org.apache.spark.ui.SparkUI | ||
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} | ||
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, SparkURLClassLoader, Utils} | ||
|
||
/** | ||
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark | ||
|
@@ -130,6 +130,16 @@ class SparkContext( | |
|
||
val isLocal = (master == "local" || master.startsWith("local[")) | ||
|
||
// Create a classLoader for use by the driver so that jars added via addJar are available to the | ||
// driver. Do this before all other initialization so that any thread pools created for this | ||
// SparkContext uses the class loader. | ||
// Note that this is config-enabled as classloaders can introduce subtle side effects | ||
private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) { | ||
val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) | ||
Thread.currentThread.setContextClassLoader(loader) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this only work if addJars is called from the thread that created the SparkContext? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will capture a pointer to the classlaoder in which the SC was created. So addJars can be called from anywhere and it will always augment this class loader. I think this means that the class will be visible to (a) the thread that created the sc and (b) any threads created by that thread. Though it would be good to verify that the context class loader is passed on to child threads or they delegate to that of the parent. This does mean that a thread entirely outside of the SparkContext-creating thread and its children won't have the class loaded. I think that's actually desirable given that you may have a case where mutliple SparkContext's are created in the same JVM. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will capture a pointer to the classlaoder in which the SC was created. So addJars can be called from anywhere and it will always augment this class loader. I think this means that the class will be visible to (a) the thread that created the sc and (b) any threads created by that thread. Though it would be good to verify that the context class loader is passed on to child threads or they delegate to that of the parent. This does mean that a thread entirely outside of the SparkContext-creating thread and its children won't have the class loaded. I think that's actually desirable given that you may have a case where mutliple SparkContext's are created in the same JVM. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll defer to @velvia on this one though as it's his design. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, ok, I understand now. In that case, to make things simpler, would it possibly make sense to not load the jars to the current thread and only load them for the SparkContext/executors? Classloader stuff can be confusing to deal with and keeping it as isolated as possible could make things easier for users. This would also line up a little more with how the MR distributed cache works - jars that get added to it don't become accessible for to driver code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey Sandy - not sure what you mean exactly by "load them for the SparkContext". The SparkContext is just a java object. The scenario we want to handle is like this:
There are two ways "Foo" can be visible for the list line. Either it can be included in the classpath when launching the JVM or it can be added dynamically to the classloader of the calling thread. Is there another way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had misunderstood how the original mechanism worked. I take this all back. |
||
Some(loader) | ||
} else None | ||
|
||
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") | ||
|
||
// Create the Spark execution environment (cache, map output tracker, etc) | ||
|
@@ -726,6 +736,8 @@ class SparkContext( | |
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. | ||
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported | ||
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. | ||
* NOTE: If you enable spark.driver.add-dynamic-jars, then the JAR will also be made available | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this comment refers to an outdated name of the config property? |
||
* to this SparkContext. local: JARs must be available on the driver node. | ||
*/ | ||
def addJar(path: String) { | ||
if (path == null) { | ||
|
@@ -767,6 +779,19 @@ class SparkContext( | |
case _ => | ||
path | ||
} | ||
|
||
// Add jar to driver class loader so it is available for driver, even if it is not on the classpath | ||
uri.getScheme match { | ||
case null | "file" | "local" => | ||
// Assume file exists on current (driver) node as well. Unlike executors, driver doesn't need to | ||
// download the jar since it's local. | ||
addUrlToDriverLoader(new URL("file:" + uri.getPath)) | ||
case "http" | "https" | "ftp" => | ||
// Should be handled by the URLClassLoader, pass along entire URL | ||
addUrlToDriverLoader(new URL(path)) | ||
case other => | ||
logWarning("This URI scheme for URI " + path + " is not supported by the driver class loader") | ||
} | ||
} | ||
if (key != null) { | ||
addedJars(key) = System.currentTimeMillis | ||
|
@@ -775,6 +800,15 @@ class SparkContext( | |
} | ||
} | ||
|
||
private def addUrlToDriverLoader(url: URL) { | ||
classLoader.foreach { loader => | ||
if (!loader.getURLs.contains(url)) { | ||
logInfo("Adding JAR " + url + " to driver class loader") | ||
loader.addURL(url) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to | ||
* any new nodes. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark | ||
|
||
import scala.collection.JavaConversions._ | ||
|
||
import java.io.{FileInputStream, FileOutputStream, File} | ||
import java.util.jar.{JarEntry, JarOutputStream} | ||
import java.net.{URL, URI} | ||
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} | ||
|
||
object TestUtils { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this or could we ship a .jar file around for use in tests? I believe we had one for some other tests with addJar. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't include a compiled There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pwendell this does enable testing more sophisticated things, but especially for testing Scala sources it becomes harder and harder to do this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure - this can only create java classes. This is just for tests that want to see visibility of code defined in different jar files. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By the way, I'm in the middle of creating a new pull request, but it's On Fri, Apr 4, 2014 at 11:24 AM, Patrick Wendell
The fruit of silence is prayer; |
||
|
||
/** Create a jar file that contains this set of files. All files will be located at the root | ||
* of the jar. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment format is wrong |
||
def createJar(files: Seq[File], jarFile: File): URL = { | ||
val jarFileStream = new FileOutputStream(jarFile) | ||
val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) | ||
|
||
for (file <- files) { | ||
val jarEntry = new JarEntry(file.getName) | ||
jarStream.putNextEntry(jarEntry) | ||
|
||
val in = new FileInputStream(file) | ||
val buffer = new Array[Byte](10240) | ||
var nRead = 0 | ||
while (nRead <= 0) { | ||
nRead = in.read(buffer, 0, buffer.length) | ||
jarStream.write(buffer, 0, nRead) | ||
} | ||
in.close() | ||
} | ||
jarStream.close() | ||
jarFileStream.close() | ||
|
||
jarFile.toURI.toURL | ||
} | ||
|
||
// Adapted from the JavaCompiler.java doc examples | ||
private val SOURCE = JavaFileObject.Kind.SOURCE | ||
private def createURI(name: String) = { | ||
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}") | ||
} | ||
private class JavaSourceFromString(val name: String, val code: String) | ||
extends SimpleJavaFileObject(createURI(name), SOURCE) { | ||
override def getCharContent(ignoreEncodingErrors: Boolean) = code | ||
} | ||
|
||
/** Creates a compiled class with the given name. Class file will be placed in destDir. */ | ||
def createCompiledClass(className: String, destDir: File): File = { | ||
val compiler = ToolProvider.getSystemJavaCompiler | ||
val sourceFile = new JavaSourceFromString(className, s"public class $className {}") | ||
|
||
// Calling this outputs a class file in pwd. It's easier to just rename the file than | ||
// build a custom FileManager that controls the output location. | ||
compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call() | ||
|
||
val fileName = className + ".class" | ||
val result = new File(fileName) | ||
if (!result.exists()) throw new Exception("Compiled file not found: " + fileName) | ||
val out = new File(destDir, fileName) | ||
result.renameTo(out) | ||
out | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,6 +111,7 @@ Apart from these, the following properties are also available, and may be useful | |
it if you configure your own old generation size. | ||
</td> | ||
</tr> | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. False change? |
||
<tr> | ||
<td>spark.shuffle.memoryFraction</td> | ||
<td>0.3</td> | ||
|
@@ -375,7 +376,7 @@ Apart from these, the following properties are also available, and may be useful | |
<td>spark.akka.heartbeat.interval</td> | ||
<td>1000</td> | ||
<td> | ||
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. | ||
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. False change? |
||
</td> | ||
</tr> | ||
<tr> | ||
|
@@ -392,6 +393,16 @@ Apart from these, the following properties are also available, and may be useful | |
Port for the driver to listen on. | ||
</td> | ||
</tr> | ||
<tr> | ||
<td>spark.driver.loadAddedJars</td> | ||
<td>false</td> | ||
<td> | ||
If true, the SparkContext uses a class loader to make jars added via `addJar` available to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could the second sentence be simplified to "The default behavior is that jars added via addJar must already be on the classpath."? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. |
||
the SparkContext. The default behavior is that jars added via `addJar` are only made | ||
available to executors, and Spark apps must include all its jars in the driver's | ||
CLASSPATH even if `addJar` is used. | ||
</td> | ||
</tr> | ||
<tr> | ||
<td>spark.cleaner.ttl</td> | ||
<td>(infinite)</td> | ||
|
@@ -430,7 +441,7 @@ Apart from these, the following properties are also available, and may be useful | |
<td>spark.broadcast.blockSize</td> | ||
<td>4096</td> | ||
<td> | ||
Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>. | ||
Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. False change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep but this removes a trailing space so might be good to keep it. |
||
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit. | ||
</td> | ||
</tr> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you should set its parent to the current thread's context class loader if one exists. Otherwise users who try to add some class loader before starting SparkContext (e.g. if they're in some server environment) will lose it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch - this is definitely something that needs to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, I'm pretty sure there is almost no way that Spark contexts can work properly inside of some server environment, with simply using Thread context classloaders. The reason is that Spark spins up so many other threads. To make everything work easier, I believe we should instead have a standard classloader set in SparkEnv or somewhere like that, which can inherit from Thread context in the thread that started SparkContext, but which can be used everywhere else that spins up new threads.