From 587547313ed981819e6fc9bb70a83a9aa5c04a4f Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 28 Jul 2021 11:00:25 +0200 Subject: [PATCH 1/2] . --- .../org/apache/spark/deploy/SparkSubmit.scala | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 745836dfbefe7..46cf100c369a3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -305,34 +305,37 @@ private[spark] class SparkSubmit extends Logging { val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT val isKubernetesClusterModeDriver = isKubernetesClient && sparkConf.getBoolean("spark.kubernetes.submitInDriver", false) + val isProxyUser = args.proxyUser != null if (!isMesosCluster && !isStandAloneCluster) { - // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files - // too for packages that include Python code - val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( - packagesTransitive = true, args.packagesExclusions, args.packages, - args.repositories, args.ivyRepoPath, args.ivySettingsPath) - - if (resolvedMavenCoordinates.nonEmpty) { - // In K8s client mode, when in the driver, add resolved jars early as we might need - // them at the submit time for artifact downloading. - // For example we might use the dependencies for downloading - // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass: - // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 - if (isKubernetesClusterModeDriver) { - val loader = getSubmitClassLoader(sparkConf) - for (jar <- resolvedMavenCoordinates) { - addJarToClasspath(jar, loader) - } - } else if (isKubernetesCluster) { - // We need this in K8s cluster mode so that we can upload local deps - // via the k8s application, like in cluster mode driver - childClasspath ++= resolvedMavenCoordinates - } else { - args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*)) - if (args.isPython || isInternal(args.primaryResource)) { - args.pyFiles = mergeFileLists(args.pyFiles, - mergeFileLists(resolvedMavenCoordinates: _*)) + if (!(isKubernetesCluster && isProxyUser)) { + // Resolve maven dependencies if there are any and add classpath to jars. Add them to + // py-files too for packages that include Python code + val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( + packagesTransitive = true, args.packagesExclusions, args.packages, + args.repositories, args.ivyRepoPath, args.ivySettingsPath) + + if (resolvedMavenCoordinates.nonEmpty) { + // In K8s client mode, when in the driver, add resolved jars early as we might need + // them at the submit time for artifact downloading. + // For example we might use the dependencies for downloading + // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass: + // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 + if (isKubernetesClusterModeDriver) { + val loader = getSubmitClassLoader(sparkConf) + for (jar <- resolvedMavenCoordinates) { + addJarToClasspath(jar, loader) + } + } else if (isKubernetesCluster) { + // We need this in K8s cluster mode so that we can upload local deps + // via the k8s application, like in cluster mode driver + childClasspath ++= resolvedMavenCoordinates + } else { + args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*)) + if (args.isPython || isInternal(args.primaryResource)) { + args.pyFiles = mergeFileLists(args.pyFiles, + mergeFileLists(resolvedMavenCoordinates: _*)) + } } } } @@ -691,7 +694,7 @@ private[spark] class SparkSubmit extends Logging { // This assumes both primaryResource and user jars are local jars, or already downloaded // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be // added to the classpath of YARN client. - if (isYarnCluster) { + if (isYarnCluster && !isProxyUser) { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } @@ -934,6 +937,8 @@ private[spark] class SparkSubmit extends Logging { logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}") logInfo("\n") } + assert(!(args.deployMode == "cluster" && args.proxyUser != null && childClasspath.nonEmpty), + "Classpath of spark-submit should not change in cluster mode if proxy user is specified") val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) From a3dc45ab10ab2974d87f6492fb8518bb3b47cea2 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 6 Jan 2023 20:59:53 +0800 Subject: [PATCH 2/2] . --- .../org/apache/spark/deploy/SparkSubmit.scala | 74 +++++++++++-------- .../spark/internal/config/package.scala | 7 ++ 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 46cf100c369a3..fe4b41312ca55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -305,37 +305,38 @@ private[spark] class SparkSubmit extends Logging { val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT val isKubernetesClusterModeDriver = isKubernetesClient && sparkConf.getBoolean("spark.kubernetes.submitInDriver", false) - val isProxyUser = args.proxyUser != null + val isCustomClasspathInClusterModeDisallowed = + !sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) && + args.proxyUser != null && + (isYarnCluster || isMesosCluster || isStandAloneCluster || isKubernetesCluster) if (!isMesosCluster && !isStandAloneCluster) { - if (!(isKubernetesCluster && isProxyUser)) { - // Resolve maven dependencies if there are any and add classpath to jars. Add them to - // py-files too for packages that include Python code - val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( - packagesTransitive = true, args.packagesExclusions, args.packages, - args.repositories, args.ivyRepoPath, args.ivySettingsPath) - - if (resolvedMavenCoordinates.nonEmpty) { - // In K8s client mode, when in the driver, add resolved jars early as we might need - // them at the submit time for artifact downloading. - // For example we might use the dependencies for downloading - // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass: - // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 - if (isKubernetesClusterModeDriver) { - val loader = getSubmitClassLoader(sparkConf) - for (jar <- resolvedMavenCoordinates) { - addJarToClasspath(jar, loader) - } - } else if (isKubernetesCluster) { - // We need this in K8s cluster mode so that we can upload local deps - // via the k8s application, like in cluster mode driver - childClasspath ++= resolvedMavenCoordinates - } else { - args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*)) - if (args.isPython || isInternal(args.primaryResource)) { - args.pyFiles = mergeFileLists(args.pyFiles, - mergeFileLists(resolvedMavenCoordinates: _*)) - } + // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files + // too for packages that include Python code + val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( + packagesTransitive = true, args.packagesExclusions, args.packages, + args.repositories, args.ivyRepoPath, args.ivySettingsPath) + + if (resolvedMavenCoordinates.nonEmpty) { + // In K8s client mode, when in the driver, add resolved jars early as we might need + // them at the submit time for artifact downloading. + // For example we might use the dependencies for downloading + // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass: + // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 + if (isKubernetesClusterModeDriver) { + val loader = getSubmitClassLoader(sparkConf) + for (jar <- resolvedMavenCoordinates) { + addJarToClasspath(jar, loader) + } + } else if (isKubernetesCluster) { + // We need this in K8s cluster mode so that we can upload local deps + // via the k8s application, like in cluster mode driver + childClasspath ++= resolvedMavenCoordinates + } else { + args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*)) + if (args.isPython || isInternal(args.primaryResource)) { + args.pyFiles = mergeFileLists(args.pyFiles, + mergeFileLists(resolvedMavenCoordinates: _*)) } } } @@ -694,7 +695,7 @@ private[spark] class SparkSubmit extends Logging { // This assumes both primaryResource and user jars are local jars, or already downloaded // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be // added to the classpath of YARN client. - if (isYarnCluster && !isProxyUser) { + if (isYarnCluster) { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } @@ -884,6 +885,13 @@ private[spark] class SparkSubmit extends Logging { sparkConf.set("spark.app.submitTime", System.currentTimeMillis().toString) + if (childClasspath.nonEmpty && isCustomClasspathInClusterModeDisallowed) { + childClasspath.clear() + logWarning(s"Ignore classpath ${childClasspath.mkString(", ")} with proxy user specified " + + s"in Cluster mode when ${ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE.key} is " + + s"disabled") + } + (childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass) } @@ -937,8 +945,10 @@ private[spark] class SparkSubmit extends Logging { logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}") logInfo("\n") } - assert(!(args.deployMode == "cluster" && args.proxyUser != null && childClasspath.nonEmpty), - "Classpath of spark-submit should not change in cluster mode if proxy user is specified") + assert(!(args.deployMode == "cluster" && args.proxyUser != null && childClasspath.nonEmpty) || + sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE), + s"Classpath of spark-submit should not change in cluster mode if proxy user is specified " + + s"when ${ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE.key} is disabled") val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index eb6ac8b765b01..be210cfe59b3d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2461,4 +2461,11 @@ package object config { .version("3.4.0") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("5s") + + private[spark] val ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE = + ConfigBuilder("spark.submit.proxyUser.allowCustomClasspathInClusterMode") + .internal() + .version("3.4.0") + .booleanConf + .createWithDefault(false) }