diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 745e3fa4e85f6..5ec949edf090e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -18,7 +18,7 @@ package org.apache.spark import java.io._ -import java.net.URI +import java.net.{URI, URL} import java.util.{Properties, UUID} import java.util.concurrent.atomic.AtomicInteger @@ -44,7 +44,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, SparkURLClassLoader, Utils} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -130,6 +130,18 @@ class SparkContext( val isLocal = (master == "local" || master.startsWith("local[")) + // Create a classLoader for use by the driver so that jars added via addJar are available to the + // driver. Do this before all other initialization so that any thread pools created for this + // SparkContext uses the class loader. + // In the future it might make sense to expose this to users so they can assign it as the + // context class loader for other threads. + // Note that this is config-enabled as classloaders can introduce subtle side effects + private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) { + val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) + Thread.currentThread.setContextClassLoader(loader) + Some(loader) + } else None + if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") // Create the Spark execution environment (cache, map output tracker, etc) @@ -726,6 +738,8 @@ class SparkContext( * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * NOTE: If you enable spark.driver.loadAddedJars, then the JAR will also be made available + * to this SparkContext and chld threads. local: JARs must be available on the driver node. */ def addJar(path: String) { if (path == null) { @@ -767,6 +781,20 @@ class SparkContext( case _ => path } + + // Add jar to driver class loader so it is available for driver, + // even if it is not on the classpath + uri.getScheme match { + case null | "file" | "local" => + // Assume file exists on current (driver) node as well. Unlike executors, driver + // doesn't need to download the jar since it's local. + addUrlToDriverLoader(new URL("file:" + uri.getPath)) + case "http" | "https" | "ftp" => + // Should be handled by the URLClassLoader, pass along entire URL + addUrlToDriverLoader(new URL(path)) + case other => + logWarning(s"This URI scheme for URI $path is not supported by the driver class loader") + } } if (key != null) { addedJars(key) = System.currentTimeMillis @@ -775,6 +803,15 @@ class SparkContext( } } + private def addUrlToDriverLoader(url: URL) { + classLoader.foreach { loader => + if (!loader.getURLs.contains(url)) { + logInfo("Adding JAR " + url + " to driver class loader") + loader.addURL(url) + } + } + } + /** * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 5e43b5198422c..57cf2ab502473 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -123,6 +123,8 @@ object SparkEnv extends Logging { isDriver: Boolean, isLocal: Boolean): SparkEnv = { + val classLoader = Thread.currentThread.getContextClassLoader + val securityManager = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, securityManager = securityManager) @@ -133,8 +135,6 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", boundPort.toString) } - val classLoader = Thread.currentThread.getContextClassLoader - // Create an instance of the class named by the given Java system property, or by // defaultClassName if the property is not set, and return it as a T def instantiateClass[T](propertyName: String, defaultClassName: String): T = { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3486092a140fb..bc71b32ee8f11 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -101,7 +101,7 @@ private[spark] object CoarseGrainedExecutorBackend { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, - indestructible = true, conf = conf, new SecurityManager(conf)) + indestructible = true, conf = conf, securityManager = new SecurityManager(conf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e69f6f72d3275..bd1e22e228e8c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, SparkURLClassLoader} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -295,7 +295,7 @@ private[spark] class Executor( * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes * created by the interpreter to the search path */ - private def createClassLoader(): ExecutorURLClassLoader = { + private def createClassLoader(): SparkURLClassLoader = { val loader = this.getClass.getClassLoader // For each of the jars in the jarSet, add them to the class loader. @@ -303,7 +303,7 @@ private[spark] class Executor( val urls = currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL }.toArray - new ExecutorURLClassLoader(urls, loader) + new SparkURLClassLoader(urls, loader) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index cb4ad4ae9350c..1747add15dd9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -69,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader - taskSetManager.abort("ClassNotFound with classloader: " + loader) + taskSetManager.abort(s"ClassNotFound [${cnf.getMessage}] with classloader: " + loader) case ex: Throwable => taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala similarity index 89% rename from core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala rename to core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala index f9bfe8ed2f5ba..19134aca496f4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.util import java.net.{URLClassLoader, URL} /** * The addURL method in URLClassLoader is protected. We subclass it to make this accessible. */ -private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) +private[spark] class SparkURLClassLoader(urls: Array[URL], parent: ClassLoader) extends URLClassLoader(urls, parent) { override def addURL(url: URL) { diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index aee9ab9091dac..9fb74341df0ff 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -33,11 +33,12 @@ class FileServerSuite extends FunSuite with LocalSparkContext { override def beforeEach() { super.beforeEach() resetSparkContext() - System.setProperty("spark.authenticate", "false") } override def beforeAll() { super.beforeAll() + System.setProperty("spark.authenticate", "false") + val tmpDir = new File(Files.createTempDir(), "test") tmpDir.mkdir() @@ -47,27 +48,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext { pw.close() val jarFile = new File(tmpDir, "test.jar") - val jarStream = new FileOutputStream(jarFile) - val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - System.setProperty("spark.authenticate", "false") - - val jarEntry = new JarEntry(textFile.getName) - jar.putNextEntry(jarEntry) - - val in = new FileInputStream(textFile) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jar.write(buffer, 0, nRead) - } - - in.close() - jar.close() - jarStream.close() + val jarUrl = TestUtils.createJar(Seq(textFile), jarFile) tmpFile = textFile - tmpJarUrl = jarFile.toURI.toURL.toString + tmpJarUrl = jarUrl.toString } test("Distributing files locally") { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 76173608e9f70..b487cecfeb669 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io.{File, FileWriter} +import java.util.concurrent.Semaphore import scala.io.Source @@ -29,8 +30,59 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext._ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat +import scala.util.Try class FileSuite extends FunSuite with LocalSparkContext { + test("adding jars to classpath at the driver") { + val tmpDir = Files.createTempDir() + val classFile = TestUtils.createCompiledClass("HelloSpark", tmpDir) + val jarFile = new File(tmpDir, "test.jar") + TestUtils.createJar(Seq(classFile), jarFile) + + def canLoadClass(clazz: String) = + Try(Class.forName(clazz, true, Thread.currentThread().getContextClassLoader)).isSuccess + + val loadedBefore = canLoadClass("HelloSpark") + + val conf = new SparkConf().setMaster("local-cluster[1,1,512]").setAppName("test") + .set("spark.driver.loadAddedJars", "true") + + var driverLoadedAfter = false + var childLoadedAfter = false + + val sem = new Semaphore(1) + sem.acquire() + + new Thread() { + override def run() { + val sc = new SparkContext(conf) + sc.addJar(jarFile.getAbsolutePath) + driverLoadedAfter = canLoadClass("HelloSpark") + + // Test visibility in a child thread + val childSem = new Semaphore(1) + childSem.acquire() + new Thread() { + override def run() { + childLoadedAfter = canLoadClass("HelloSpark") + childSem.release() + } + }.start() + + childSem.acquire() + sem.release() + } + }.start() + sem.acquire() + + // Test visibility in a parent thread + val parentLoadedAfter = canLoadClass("HelloSpark") + + assert(false === loadedBefore, "Class visible before being added") + assert(true === driverLoadedAfter, "Class was not visible after being added") + assert(true === childLoadedAfter, "Class was not visible to child thread after being added") + assert(false === parentLoadedAfter, "Class was visible to parent thread after being added") + } test("text files") { sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/org/apache/spark/TestUtils.scala b/core/src/test/scala/org/apache/spark/TestUtils.scala new file mode 100644 index 0000000000000..6b72bacac47a2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/TestUtils.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.JavaConversions._ + +import java.io.{FileInputStream, FileOutputStream, File} +import java.util.jar.{JarEntry, JarOutputStream} +import java.net.{URL, URI} +import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} + +object TestUtils { + + /** Create a jar file that contains this set of files. All files will be located at the root + * of the jar. */ + def createJar(files: Seq[File], jarFile: File): URL = { + val jarFileStream = new FileOutputStream(jarFile) + val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) + + for (file <- files) { + val jarEntry = new JarEntry(file.getName) + jarStream.putNextEntry(jarEntry) + + val in = new FileInputStream(file) + val buffer = new Array[Byte](10240) + var nRead = 0 + while (nRead <= 0) { + nRead = in.read(buffer, 0, buffer.length) + jarStream.write(buffer, 0, nRead) + } + in.close() + } + jarStream.close() + jarFileStream.close() + + jarFile.toURI.toURL + } + + // Adapted from the JavaCompiler.java doc examples + private val SOURCE = JavaFileObject.Kind.SOURCE + private def createURI(name: String) = { + URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}") + } + private class JavaSourceFromString(val name: String, val code: String) + extends SimpleJavaFileObject(createURI(name), SOURCE) { + override def getCharContent(ignoreEncodingErrors: Boolean) = code + } + + /** Creates a compiled class with the given name. Class file will be placed in destDir. */ + def createCompiledClass(className: String, destDir: File): File = { + val compiler = ToolProvider.getSystemJavaCompiler + val sourceFile = new JavaSourceFromString(className, s"public class $className {}") + + // Calling this outputs a class file in pwd. It's easier to just rename the file than + // build a custom FileManager that controls the output location. + compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call() + + val fileName = className + ".class" + val result = new File(fileName) + if (!result.exists()) throw new Exception("Compiled file not found: " + fileName) + val out = new File(destDir, fileName) + result.renameTo(out) + out + } +} diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index e16703292cc22..e4af0e5c4ff6c 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -64,7 +64,8 @@ and `addFile`. - **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected - **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, - or shared via NFS, GlusterFS, etc. + or shared via NFS, GlusterFS, etc. Note that if `spark.driver.loadAddedJars` is set, + then the file must be visible to the node running the SparkContext as well. Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. Over time this can use up a significant amount of space and will need to be cleaned up. diff --git a/docs/configuration.md b/docs/configuration.md index a006224d5080c..f9ecb1cd53c11 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -392,6 +392,16 @@ Apart from these, the following properties are also available, and may be useful Port for the driver to listen on. +
TorrentBroadcastFactory
.
+ Size of each piece of a block in kilobytes for TorrentBroadcastFactory
.
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager
might take a performance hit.