Skip to content

Commit

Permalink
[SPARK-33925][CORE] Remove unused SecurityManager in Utils.fetchFile
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is kind of a followup of apache#24033.
The first and last usage of that argument `SecurityManager` was removed in apache#24033.
After that,  we don't need to pass `SecurityManager` anymore in `Utils.fetchFile` and related code paths.

This PR proposes to remove it out.

### Why are the changes needed?

For better readability of codes.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually complied. GitHub Actions and Jenkins build should test it out as well.

Closes apache#30945 from HyukjinKwon/SPARK-33925.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

(cherry picked from commit b33fa53)
  • Loading branch information
HyukjinKwon authored and Egor Krivokon committed Oct 27, 2021
1 parent 13cb416 commit 6624406
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 38 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ private[spark] class SparkSubmit extends Logging {
var localPyFiles: String = null
if (deployMode == CLIENT) {
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
downloadFile(_, targetDir, sparkConf, hadoopConf)
}.orNull
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull

if (isKubernetesClusterModeDriver) {
Expand All @@ -391,14 +391,14 @@ private[spark] class SparkSubmit extends Logging {
// Explicitly download the related files here
args.jars = localJars
val filesLocalFiles = Option(args.files).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
val archiveLocalFiles = Option(args.archives).map { uris =>
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val localArchives = downloadFileList(
resolvedUris.map(
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf, secMgr)
targetDir, sparkConf, hadoopConf)

// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
// in cluster mode, the archives should be available in the driver's current working
Expand Down Expand Up @@ -449,7 +449,7 @@ private[spark] class SparkSubmit extends Logging {
if (file.exists()) {
file.toURI.toString
} else {
downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
downloadFile(resource, targetDir, sparkConf, hadoopConf)
}
case _ => uri.toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ object DriverWrapper extends Logging {
jarsProp
}
}
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf,
secMgr)
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf)
DependencyUtils.addJarsToClassPath(localJars, loader)
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -949,15 +949,15 @@ private[spark] class Executor(
logInfo(s"Fetching $name with timestamp $timestamp")
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, -1L) < timestamp) {
logInfo(s"Fetching $name with timestamp $timestamp")
val sourceURI = new URI(name)
val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
val dest = new File(
SparkFiles.getRootDirectory(),
if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName)
Expand All @@ -976,7 +976,7 @@ private[spark] class Executor(
logInfo(s"Fetching $name with timestamp $timestamp")
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
hadoopConf, timestamp, useCache = !isLocal)
currentJars(name) = timestamp
// Add it to our class loader
val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL
Expand Down
38 changes: 16 additions & 22 deletions core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -187,11 +187,10 @@ private[spark] object DependencyUtils extends Logging {
}

def resolveAndDownloadJars(
jars: String,
userJar: String,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
jars: String,
userJar: String,
sparkConf: SparkConf,
hadoopConf: Configuration): String = {
val targetDir = Utils.createTempDir()
val userJarName = userJar.split(File.separatorChar).last
Option(jars)
Expand All @@ -202,7 +201,7 @@ private[spark] object DependencyUtils extends Logging {
.mkString(",")
}
.filterNot(_ == "")
.map(downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr))
.map(downloadFileList(_, targetDir, sparkConf, hadoopConf))
.orNull
}

Expand All @@ -222,18 +221,16 @@ private[spark] object DependencyUtils extends Logging {
* @param targetDir A temporary directory for which downloaded files.
* @param sparkConf Spark configuration.
* @param hadoopConf Hadoop configuration.
* @param secMgr Spark security manager.
* @return A comma separated local files list.
*/
def downloadFileList(
fileList: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
fileList: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration): String = {
require(fileList != null, "fileList cannot be null.")
Utils.stringToSeq(fileList)
.map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr))
.map(downloadFile(_, targetDir, sparkConf, hadoopConf))
.mkString(",")
}

Expand All @@ -245,15 +242,13 @@ private[spark] object DependencyUtils extends Logging {
* @param targetDir A temporary directory for which downloaded files.
* @param sparkConf Spark configuration.
* @param hadoopConf Hadoop configuration.
* @param secMgr Spark security manager.
* @return Path to the local file.
*/
def downloadFile(
path: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
path: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration): String = {
require(path != null, "path cannot be null.")
val uri = Utils.resolveURI(path)

Expand All @@ -266,8 +261,7 @@ private[spark] object DependencyUtils extends Logging {
new File(targetDir, file.getName).toURI.toString
case _ =>
val fname = new Path(uri).getName()
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr,
hadoopConf)
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, hadoopConf)
localFile.toURI().toString()
}
}
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ private[spark] object Utils extends Logging {
url: String,
targetDir: File,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean,
Expand Down Expand Up @@ -533,7 +532,7 @@ private[spark] object Utils extends Logging {
val cachedFile = new File(localDir, cachedFileName)
try {
if (!cachedFile.exists()) {
doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
doFetchFile(url, localDir, cachedFileName, conf, hadoopConf)
}
} finally {
lock.release()
Expand All @@ -546,7 +545,7 @@ private[spark] object Utils extends Logging {
conf.getBoolean("spark.files.overwrite", false)
)
} else {
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
doFetchFile(url, targetDir, fileName, conf, hadoopConf)
}

if (shouldUntar) {
Expand Down Expand Up @@ -749,7 +748,6 @@ private[spark] object Utils extends Logging {
targetDir: File,
filename: String,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration): File = {
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._

import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.deploy.SparkSubmitTestUtils
import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
Expand Down

0 comments on commit 6624406

Please sign in to comment.