Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17650] malformed url's throw exceptions before bricking Executors #15224

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.net.{MalformedURLException, URI}
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
Expand All @@ -36,18 +36,15 @@ import com.google.common.collect.MapMaker
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
TextInputFormat}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat,
WholeTextFileInputFormat}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -1452,6 +1449,9 @@ class SparkContext(config: SparkConf) extends Logging {
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
"turned on.")
}
} else {
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
}

val key = if (!isLocal && scheme == "file") {
Expand Down Expand Up @@ -1711,6 +1711,8 @@ class SparkContext(config: SparkConf) extends Logging {
key = env.rpcEnv.fileServer.addJar(new File(path))
} else {
val uri = new URI(path)
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
key = uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,28 @@ private[spark] object Utils extends Logging {
}
}

/**
* Validate that a given URI is actually a valid URL as well.
* @param uri The URI to validate
*/
@throws[MalformedURLException]("when the URI is an invalid URL")
def validateURL(uri: URI): Unit = {
Option(uri.getScheme).getOrElse("file") match {
case "http" | "https" | "ftp" =>
try {
uri.toURL
} catch {
case e: MalformedURLException =>
val msg = s"URI (${uri.toString}) is not a valid URL."
logError(msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not need to log it since it's already be thrown.

val ex = new MalformedURLException(msg)
ex.initCause(e)
throw ex
}
case _ => // will not be turned into a URL anyway
}
}

/**
* Get the path of a temporary directory. Spark's local directories can be configured through
* multiple settings, which are used with the following precedence:
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.File
import java.net.MalformedURLException
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -173,6 +174,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("SPARK-17650: malformed url's throw exceptions before bricking Executors") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
Seq("http", "https", "ftp").foreach { scheme =>
val badURL = s"$scheme://user:pwd/path"
val e1 = intercept[MalformedURLException] {
sc.addFile(badURL)
}
assert(e1.getMessage.contains(badURL))
val e2 = intercept[MalformedURLException] {
sc.addJar(badURL)
}
assert(e2.getMessage.contains(badURL))
assert(sc.addedFiles.isEmpty)
assert(sc.addedJars.isEmpty)
}
} finally {
sc.stop()
}
}

test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
Expand Down