Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1230: [WIP] Enable SparkContext.addJars() to load classes not in CLASSPATH #119

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import java.io._
import java.net.URI
import java.net.{URI, URL}
import java.util.{Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -44,7 +44,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, SparkURLClassLoader, Utils}

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -130,6 +130,16 @@ class SparkContext(

val isLocal = (master == "local" || master.startsWith("local["))

// Create a classLoader for use by the driver so that jars added via addJar are available to the
// driver. Do this before all other initialization so that any thread pools created for this
// SparkContext uses the class loader.
// Note that this is config-enabled as classloaders can introduce subtle side effects
private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) {
val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should set its parent to the current thread's context class loader if one exists. Otherwise users who try to add some class loader before starting SparkContext (e.g. if they're in some server environment) will lose it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch - this is definitely something that needs to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I'm pretty sure there is almost no way that Spark contexts can work properly inside of some server environment, with simply using Thread context classloaders. The reason is that Spark spins up so many other threads. To make everything work easier, I believe we should instead have a standard classloader set in SparkEnv or somewhere like that, which can inherit from Thread context in the thread that started SparkContext, but which can be used everywhere else that spins up new threads.

Thread.currentThread.setContextClassLoader(loader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this only work if addJars is called from the thread that created the SparkContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will capture a pointer to the classlaoder in which the SC was created. So addJars can be called from anywhere and it will always augment this class loader.

I think this means that the class will be visible to (a) the thread that created the sc and (b) any threads created by that thread. Though it would be good to verify that the context class loader is passed on to child threads or they delegate to that of the parent.

This does mean that a thread entirely outside of the SparkContext-creating thread and its children won't have the class loaded. I think that's actually desirable given that you may have a case where mutliple SparkContext's are created in the same JVM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will capture a pointer to the classlaoder in which the SC was created. So addJars can be called from anywhere and it will always augment this class loader.

I think this means that the class will be visible to (a) the thread that created the sc and (b) any threads created by that thread. Though it would be good to verify that the context class loader is passed on to child threads or they delegate to that of the parent.

This does mean that a thread entirely outside of the SparkContext-creating thread and its children won't have the class loaded. I think that's actually desirable given that you may have a case where mutliple SparkContext's are created in the same JVM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer to @velvia on this one though as it's his design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, I understand now. In that case, to make things simpler, would it possibly make sense to not load the jars to the current thread and only load them for the SparkContext/executors? Classloader stuff can be confusing to deal with and keeping it as isolated as possible could make things easier for users. This would also line up a little more with how the MR distributed cache works - jars that get added to it don't become accessible for to driver code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Sandy - not sure what you mean exactly by "load them for the SparkContext". The SparkContext is just a java object. The scenario we want to handle is like this:

val sc = new SparkContext(...)
sc.addJar("jar-containing-lib-foo")
val x: Seq[Foo] = sc.textFile(...).map(...).collect()

There are two ways "Foo" can be visible for the list line. Either it can be included in the classpath when launching the JVM or it can be added dynamically to the classloader of the calling thread. Is there another way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had misunderstood how the original mechanism worked. I take this all back.

Some(loader)
} else None

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

// Create the Spark execution environment (cache, map output tracker, etc)
Expand Down Expand Up @@ -726,6 +736,8 @@ class SparkContext(
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
* NOTE: If you enable spark.driver.add-dynamic-jars, then the JAR will also be made available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this comment refers to an outdated name of the config property?

* to this SparkContext. local: JARs must be available on the driver node.
*/
def addJar(path: String) {
if (path == null) {
Expand Down Expand Up @@ -767,6 +779,19 @@ class SparkContext(
case _ =>
path
}

// Add jar to driver class loader so it is available for driver, even if it is not on the classpath
uri.getScheme match {
case null | "file" | "local" =>
// Assume file exists on current (driver) node as well. Unlike executors, driver doesn't need to
// download the jar since it's local.
addUrlToDriverLoader(new URL("file:" + uri.getPath))
case "http" | "https" | "ftp" =>
// Should be handled by the URLClassLoader, pass along entire URL
addUrlToDriverLoader(new URL(path))
case other =>
logWarning("This URI scheme for URI " + path + " is not supported by the driver class loader")
}
}
if (key != null) {
addedJars(key) = System.currentTimeMillis
Expand All @@ -775,6 +800,15 @@ class SparkContext(
}
}

private def addUrlToDriverLoader(url: URL) {
classLoader.foreach { loader =>
if (!loader.getURLs.contains(url)) {
logInfo("Adding JAR " + url + " to driver class loader")
loader.addURL(url)
}
}
}

/**
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
* any new nodes.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ object SparkEnv extends Logging {
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {

val classLoader = Thread.currentThread.getContextClassLoader

val securityManager = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager)
Expand All @@ -133,8 +135,6 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", boundPort.toString)
}

val classLoader = Thread.currentThread.getContextClassLoader

// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
indestructible = true, conf = conf, securityManager = new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.Utils
import org.apache.spark.util.{Utils, SparkURLClassLoader}

/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
Expand Down Expand Up @@ -295,15 +295,15 @@ private[spark] class Executor(
* Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
* created by the interpreter to the search path
*/
private def createClassLoader(): ExecutorURLClassLoader = {
private def createClassLoader(): SparkURLClassLoader = {
val loader = this.getClass.getClassLoader

// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
val urls = currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
}.toArray
new ExecutorURLClassLoader(urls, loader)
new SparkURLClassLoader(urls, loader)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
taskSetManager.abort("ClassNotFound [" + cnf.getMessage + "] with classloader: " + loader)
case ex: Throwable =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.executor
package org.apache.spark.util

import java.net.{URLClassLoader, URL}

/**
* The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
*/
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
private[spark] class SparkURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends URLClassLoader(urls, parent) {

override def addURL(url: URL) {
Expand Down
24 changes: 4 additions & 20 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
override def beforeEach() {
super.beforeEach()
resetSparkContext()
System.setProperty("spark.authenticate", "false")
}

override def beforeAll() {
super.beforeAll()
System.setProperty("spark.authenticate", "false")

val tmpDir = new File(Files.createTempDir(), "test")
tmpDir.mkdir()

Expand All @@ -47,27 +48,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
pw.close()

val jarFile = new File(tmpDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
System.setProperty("spark.authenticate", "false")

val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)

val in = new FileInputStream(textFile)
val buffer = new Array[Byte](10240)
var nRead = 0
while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jar.write(buffer, 0, nRead)
}

in.close()
jar.close()
jarStream.close()
val jarUrl = TestUtils.createJar(Seq(textFile), jarFile)

tmpFile = textFile
tmpJarUrl = jarFile.toURI.toURL.toString
tmpJarUrl = jarUrl.toString
}

test("Distributing files locally") {
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,31 @@ import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import scala.util.Try

class FileSuite extends FunSuite with LocalSparkContext {
test("adding jars to classpath at the driver") {
val tmpDir = Files.createTempDir()
val classFile = TestUtils.createCompiledClass("HelloSpark", tmpDir)
val jarFile = new File(tmpDir, "test.jar")
TestUtils.createJar(Seq(classFile), jarFile)

def canLoadClass(clazz: String) =
Try(Class.forName(clazz, true, Thread.currentThread().getContextClassLoader)).isSuccess

val driverLoadedBefore = canLoadClass("HelloSpark")

val conf = new SparkConf().setMaster("local-cluster[1,1,512]").setAppName("test")
.set("spark.driver.loadAddedJars", "true")

val sc = new SparkContext(conf)
sc.addJar(jarFile.getAbsolutePath)

val driverLoadedAfter = canLoadClass("HelloSpark")

assert(false === driverLoadedBefore, "Class visible before being added")
assert(true === driverLoadedAfter, "Class was not visible after being added")
}

test("text files") {
sc = new SparkContext("local", "test")
Expand Down
80 changes: 80 additions & 0 deletions core/src/test/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import scala.collection.JavaConversions._

import java.io.{FileInputStream, FileOutputStream, File}
import java.util.jar.{JarEntry, JarOutputStream}
import java.net.{URL, URI}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}

object TestUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this or could we ship a .jar file around for use in tests? I believe we had one for some other tests with addJar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't include a compiled .jar file in the Spark repo but we could create a project in the build that builds a test jar. I thought this was strictly better since we can test more sophisticated things down the road given this mechanism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell this does enable testing more sophisticated things, but especially for testing Scala sources it becomes harder and harder to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - this can only create java classes. This is just for tests that want to see visibility of code defined in different jar files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I'm in the middle of creating a new pull request, but it's
taking a while.

On Fri, Apr 4, 2014 at 11:24 AM, Patrick Wendell
[email protected]:

In core/src/test/scala/org/apache/spark/TestUtils.scala:

  • * distributed under the License is distributed on an "AS IS" BASIS,
  • * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • * See the License for the specific language governing permissions and
  • * limitations under the License.
  • */
    +
    +package org.apache.spark
    +
    +import scala.collection.JavaConversions._
    +
    +import java.io.{FileInputStream, FileOutputStream, File}
    +import java.util.jar.{JarEntry, JarOutputStream}
    +import java.net.{URL, URI}
    +import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
    +
    +object TestUtils {

Sure - this can only create java classes. This is just for tests that want
to see visibility of code defined in different jar files.

Reply to this email directly or view it on GitHubhttps://github.com//pull/119/files#r11307071
.

The fruit of silence is prayer;
the fruit of prayer is faith;
the fruit of faith is love;
the fruit of love is service;
the fruit of service is peace. -- Mother Teresa


/** Create a jar file that contains this set of files. All files will be located at the root
* of the jar. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment format is wrong

def createJar(files: Seq[File], jarFile: File): URL = {
val jarFileStream = new FileOutputStream(jarFile)
val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())

for (file <- files) {
val jarEntry = new JarEntry(file.getName)
jarStream.putNextEntry(jarEntry)

val in = new FileInputStream(file)
val buffer = new Array[Byte](10240)
var nRead = 0
while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jarStream.write(buffer, 0, nRead)
}
in.close()
}
jarStream.close()
jarFileStream.close()

jarFile.toURI.toURL
}

// Adapted from the JavaCompiler.java doc examples
private val SOURCE = JavaFileObject.Kind.SOURCE
private def createURI(name: String) = {
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
}
private class JavaSourceFromString(val name: String, val code: String)
extends SimpleJavaFileObject(createURI(name), SOURCE) {
override def getCharContent(ignoreEncodingErrors: Boolean) = code
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(className: String, destDir: File): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val sourceFile = new JavaSourceFromString(className, s"public class $className {}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call()

val fileName = className + ".class"
val result = new File(fileName)
if (!result.exists()) throw new Exception("Compiled file not found: " + fileName)
val out = new File(destDir, fileName)
result.renameTo(out)
out
}
}
3 changes: 2 additions & 1 deletion docs/cluster-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ and `addFile`.
- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected
- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This
means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker,
or shared via NFS, GlusterFS, etc.
or shared via NFS, GlusterFS, etc. Note that if `spark.driver.loadAddedJars` is set,
then the file must be visible to the node running the SparkContext as well.

Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes.
Over time this can use up a significant amount of space and will need to be cleaned up.
Expand Down
15 changes: 13 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Apart from these, the following properties are also available, and may be useful
it if you configure your own old generation size.
</td>
</tr>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False change?

<tr>
<td>spark.shuffle.memoryFraction</td>
<td>0.3</td>
Expand Down Expand Up @@ -375,7 +376,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.akka.heartbeat.interval</td>
<td>1000</td>
<td>
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False change?

</td>
</tr>
<tr>
Expand All @@ -392,6 +393,16 @@ Apart from these, the following properties are also available, and may be useful
Port for the driver to listen on.
</td>
</tr>
<tr>
<td>spark.driver.loadAddedJars</td>
<td>false</td>
<td>
If true, the SparkContext uses a class loader to make jars added via `addJar` available to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the second sentence be simplified to "The default behavior is that jars added via addJar must already be on the classpath."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

the SparkContext. The default behavior is that jars added via `addJar` are only made
available to executors, and Spark apps must include all its jars in the driver's
CLASSPATH even if `addJar` is used.
</td>
</tr>
<tr>
<td>spark.cleaner.ttl</td>
<td>(infinite)</td>
Expand Down Expand Up @@ -430,7 +441,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.broadcast.blockSize</td>
<td>4096</td>
<td>
Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep but this removes a trailing space so might be good to keep it.

Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
Expand Down