forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-48238][BUILD][YARN] Replace YARN AmIpFilter with a forked impl…
…ementation ### What changes were proposed in this pull request? This PR replaces AmIpFilter with a forked implementation, and removes the dependency `hadoop-yarn-server-web-proxy` ### Why are the changes needed? SPARK-47118 upgraded Spark built-in Jetty from 10 to 11, and migrated from `javax.servlet` to `jakarta.servlet`, which breaks the Spark on YARN. ``` Caused by: java.lang.IllegalStateException: class org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter is not a jakarta.servlet.Filter at org.sparkproject.jetty.servlet.FilterHolder.doStart(FilterHolder.java:99) at org.sparkproject.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:93) at org.sparkproject.jetty.servlet.ServletHandler.lambda$initialize$2(ServletHandler.java:724) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) at org.sparkproject.jetty.servlet.ServletHandler.initialize(ServletHandler.java:749) ... 38 more ``` During the investigation, I found a comment here apache#31642 (comment) > Agree that in the long term we should either: 1) consider to re-implement the logic in Spark which allows us to get away from server-side dependency in Hadoop ... This should be a simple and clean way to address the exact issue, then we don't need to wait for Hadoop `jakarta.servlet` migration, and it also strips a Hadoop dependency. ### Does this PR introduce _any_ user-facing change? No, this recovers the bootstrap of the Spark application on YARN mode, keeping the same behavior with Spark 3.5 and earlier versions. ### How was this patch tested? UTs are added. (refer to `org.apache.hadoop.yarn.server.webproxy.amfilter.TestAmFilter`) I tested it in a YARN cluster. Spark successfully started. ``` roothadoop-master1:/opt/spark-SPARK-48238# JAVA_HOME=/opt/openjdk-17 bin/spark-sql --conf spark.yarn.appMasterEnv.JAVA_HOME=/opt/openjdk-17 --conf spark.executorEnv.JAVA_HOME=/opt/openjdk-17 WARNING: Using incubator modules: jdk.incubator.vector Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2024-05-18 04:11:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2024-05-18 04:11:44 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive} is set, falling back to uploading libraries under SPARK_HOME. Spark Web UI available at http://hadoop-master1.orb.local:4040 Spark master: yarn, Application Id: application_1716005503866_0001 spark-sql (default)> select version(); 4.0.0 4ddc230 Time taken: 1.707 seconds, Fetched 1 row(s) spark-sql (default)> ``` When access `http://hadoop-master1.orb.local:4040`, it redirects to `http://hadoop-master1.orb.local:8088/proxy/redirect/application_1716005503866_0001/`, and the UI looks correct. <img width="1474" alt="image" src="https://github.com/apache/spark/assets/26535726/8500fc83-48c5-4603-8d05-37855f0308ae"> ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#46611 from pan3793/SPARK-48238. Authored-by: Cheng Pan <[email protected]> Signed-off-by: yangjie01 <[email protected]>
- Loading branch information
1 parent
6fcdaab
commit 4fc2910
Showing
10 changed files
with
798 additions
and
84 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
239 changes: 239 additions & 0 deletions
239
resource-managers/yarn/src/main/java/org/apache/spark/deploy/yarn/AmIpFilter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.yarn; | ||
|
||
import org.apache.hadoop.classification.InterfaceAudience.Public; | ||
import org.apache.hadoop.classification.VisibleForTesting; | ||
import org.apache.hadoop.security.UserGroupInformation; | ||
import org.apache.hadoop.util.Time; | ||
|
||
import jakarta.servlet.*; | ||
import jakarta.servlet.http.Cookie; | ||
import jakarta.servlet.http.HttpServletRequest; | ||
import jakarta.servlet.http.HttpServletResponse; | ||
import java.io.IOException; | ||
import java.net.*; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.apache.spark.internal.SparkLogger; | ||
import org.apache.spark.internal.SparkLoggerFactory; | ||
|
||
// This class is copied from Hadoop 3.4.0 | ||
// org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter | ||
// | ||
// Modification: | ||
// Migrate from javax.servlet to jakarta.servlet | ||
// Copy constant string definitions to strip external dependency | ||
// - RM_HA_URLS | ||
// - PROXY_USER_COOKIE_NAME | ||
@Public | ||
public class AmIpFilter implements Filter { | ||
private static final SparkLogger LOG = SparkLoggerFactory.getLogger(AmIpFilter.class); | ||
|
||
@Deprecated | ||
public static final String PROXY_HOST = "PROXY_HOST"; | ||
@Deprecated | ||
public static final String PROXY_URI_BASE = "PROXY_URI_BASE"; | ||
public static final String PROXY_HOSTS = "PROXY_HOSTS"; | ||
public static final String PROXY_HOSTS_DELIMITER = ","; | ||
public static final String PROXY_URI_BASES = "PROXY_URI_BASES"; | ||
public static final String PROXY_URI_BASES_DELIMITER = ","; | ||
private static final String PROXY_PATH = "/proxy"; | ||
// RM_HA_URLS is defined in AmFilterInitializer in the original Hadoop code | ||
private static final String RM_HA_URLS = "RM_HA_URLS"; | ||
// WebAppProxyServlet is defined in WebAppProxyServlet in the original Hadoop code | ||
public static final String PROXY_USER_COOKIE_NAME = "proxy-user"; | ||
// update the proxy IP list about every 5 min | ||
private static long updateInterval = TimeUnit.MINUTES.toMillis(5); | ||
|
||
private String[] proxyHosts; | ||
private Set<String> proxyAddresses = null; | ||
private long lastUpdate; | ||
@VisibleForTesting | ||
Map<String, String> proxyUriBases; | ||
String[] rmUrls = null; | ||
|
||
@Override | ||
public void init(FilterConfig conf) throws ServletException { | ||
// Maintain for backwards compatibility | ||
if (conf.getInitParameter(PROXY_HOST) != null | ||
&& conf.getInitParameter(PROXY_URI_BASE) != null) { | ||
proxyHosts = new String[]{conf.getInitParameter(PROXY_HOST)}; | ||
proxyUriBases = new HashMap<>(1); | ||
proxyUriBases.put("dummy", conf.getInitParameter(PROXY_URI_BASE)); | ||
} else { | ||
proxyHosts = conf.getInitParameter(PROXY_HOSTS) | ||
.split(PROXY_HOSTS_DELIMITER); | ||
|
||
String[] proxyUriBasesArr = conf.getInitParameter(PROXY_URI_BASES) | ||
.split(PROXY_URI_BASES_DELIMITER); | ||
proxyUriBases = new HashMap<>(proxyUriBasesArr.length); | ||
for (String proxyUriBase : proxyUriBasesArr) { | ||
try { | ||
URL url = new URL(proxyUriBase); | ||
proxyUriBases.put(url.getHost() + ":" + url.getPort(), proxyUriBase); | ||
} catch(MalformedURLException e) { | ||
LOG.warn(proxyUriBase + " does not appear to be a valid URL", e); | ||
} | ||
} | ||
} | ||
|
||
if (conf.getInitParameter(RM_HA_URLS) != null) { | ||
rmUrls = conf.getInitParameter(RM_HA_URLS).split(","); | ||
} | ||
} | ||
|
||
protected Set<String> getProxyAddresses() throws ServletException { | ||
long now = Time.monotonicNow(); | ||
synchronized(this) { | ||
if (proxyAddresses == null || (lastUpdate + updateInterval) <= now) { | ||
proxyAddresses = new HashSet<>(); | ||
for (String proxyHost : proxyHosts) { | ||
try { | ||
for (InetAddress add : InetAddress.getAllByName(proxyHost)) { | ||
LOG.debug("proxy address is: {}", add.getHostAddress()); | ||
proxyAddresses.add(add.getHostAddress()); | ||
} | ||
lastUpdate = now; | ||
} catch (UnknownHostException e) { | ||
LOG.warn("Could not locate " + proxyHost + " - skipping", e); | ||
} | ||
} | ||
if (proxyAddresses.isEmpty()) { | ||
throw new ServletException("Could not locate any of the proxy hosts"); | ||
} | ||
} | ||
return proxyAddresses; | ||
} | ||
} | ||
|
||
@Override | ||
public void destroy() { | ||
// Empty | ||
} | ||
|
||
@Override | ||
public void doFilter(ServletRequest req, ServletResponse resp, | ||
FilterChain chain) throws IOException, ServletException { | ||
ProxyUtils.rejectNonHttpRequests(req); | ||
|
||
HttpServletRequest httpReq = (HttpServletRequest)req; | ||
HttpServletResponse httpResp = (HttpServletResponse)resp; | ||
|
||
LOG.debug("Remote address for request is: {}", httpReq.getRemoteAddr()); | ||
|
||
if (!getProxyAddresses().contains(httpReq.getRemoteAddr())) { | ||
StringBuilder redirect = new StringBuilder(findRedirectUrl()); | ||
|
||
redirect.append(httpReq.getRequestURI()); | ||
|
||
int insertPoint = redirect.indexOf(PROXY_PATH); | ||
|
||
if (insertPoint >= 0) { | ||
// Add /redirect as the second component of the path so that the RM web | ||
// proxy knows that this request was a redirect. | ||
insertPoint += PROXY_PATH.length(); | ||
redirect.insert(insertPoint, "/redirect"); | ||
} | ||
// add the query parameters on the redirect if there were any | ||
String queryString = httpReq.getQueryString(); | ||
if (queryString != null && !queryString.isEmpty()) { | ||
redirect.append("?"); | ||
redirect.append(queryString); | ||
} | ||
|
||
ProxyUtils.sendRedirect(httpReq, httpResp, redirect.toString()); | ||
} else { | ||
String user = null; | ||
|
||
if (httpReq.getCookies() != null) { | ||
for (Cookie c: httpReq.getCookies()) { | ||
if (PROXY_USER_COOKIE_NAME.equals(c.getName())){ | ||
user = c.getValue(); | ||
break; | ||
} | ||
} | ||
} | ||
if (user == null) { | ||
LOG.debug("Could not find {} cookie, so user will not be set", | ||
PROXY_USER_COOKIE_NAME); | ||
|
||
chain.doFilter(req, resp); | ||
} else { | ||
AmIpPrincipal principal = new AmIpPrincipal(user); | ||
ServletRequest requestWrapper = new AmIpServletRequestWrapper(httpReq, | ||
principal); | ||
|
||
chain.doFilter(requestWrapper, resp); | ||
} | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
public String findRedirectUrl() throws ServletException { | ||
String addr = null; | ||
if (proxyUriBases.size() == 1) { | ||
// external proxy or not RM HA | ||
addr = proxyUriBases.values().iterator().next(); | ||
} else if (rmUrls != null) { | ||
for (String url : rmUrls) { | ||
String host = proxyUriBases.get(url); | ||
if (isValidUrl(host)) { | ||
addr = host; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
if (addr == null) { | ||
throw new ServletException( | ||
"Could not determine the proxy server for redirection"); | ||
} | ||
return addr; | ||
} | ||
|
||
@VisibleForTesting | ||
public boolean isValidUrl(String url) { | ||
boolean isValid = false; | ||
try { | ||
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); | ||
conn.connect(); | ||
isValid = conn.getResponseCode() == HttpURLConnection.HTTP_OK; | ||
// If security is enabled, any valid RM which can give 401 Unauthorized is | ||
// good enough to access. Since AM doesn't have enough credential, auth | ||
// cannot be completed and hence 401 is fine in such case. | ||
if (!isValid && UserGroupInformation.isSecurityEnabled()) { | ||
isValid = (conn.getResponseCode() == HttpURLConnection.HTTP_UNAUTHORIZED) | ||
|| (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN); | ||
return isValid; | ||
} | ||
} catch (Exception e) { | ||
LOG.warn("Failed to connect to " + url + ": " + e.toString()); | ||
} | ||
return isValid; | ||
} | ||
|
||
@VisibleForTesting | ||
protected static void setUpdateInterval(long updateInterval) { | ||
AmIpFilter.updateInterval = updateInterval; | ||
} | ||
} |
35 changes: 35 additions & 0 deletions
35
resource-managers/yarn/src/main/java/org/apache/spark/deploy/yarn/AmIpPrincipal.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.yarn; | ||
|
||
import java.security.Principal; | ||
|
||
// This class is copied from Hadoop 3.4.0 | ||
// org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpPrincipal | ||
public class AmIpPrincipal implements Principal { | ||
private final String name; | ||
|
||
public AmIpPrincipal(String name) { | ||
this.name = name; | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return name; | ||
} | ||
} |
Oops, something went wrong.