Skip to content

Commit

Permalink
fix and ut
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed May 17, 2024
1 parent fa3f369 commit 050e3d9
Show file tree
Hide file tree
Showing 2 changed files with 369 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,11 @@ class YarnAMIpFilter extends Filter with Logging {

import YarnAMIpFilter._

private[spark] class AmIpPrincipal(name: String) extends Principal {
override def getName: String = name
}

private var proxyHosts: Array[String] = _
private var proxyAddresses: Set[String] = _
private var lastUpdate: Long = 0L
private var proxyUriBases: Map[String, String] = _
private var rmUrls: Array[String] = _
private[spark] var proxyHosts: Array[String] = _
private[spark] var proxyAddresses: Set[String] = _
private[spark] var lastUpdate: Long = 0L
private[spark] var proxyUriBases: Map[String, String] = _
private[spark] var rmUrls: Array[String] = _

@throws[ServletException]
override def init(conf: FilterConfig): Unit = {
Expand Down Expand Up @@ -74,7 +70,7 @@ class YarnAMIpFilter extends Filter with Logging {
}

@throws[ServletException]
private def getProxyAddresses: Set[String] = {
protected[spark] def getProxyAddresses: Set[String] = {
val now = System.nanoTime()
if (proxyAddresses == null || (lastUpdate + updateInterval) <= now) {
this.synchronized {
Expand Down Expand Up @@ -130,11 +126,7 @@ class YarnAMIpFilter extends Filter with Logging {
} match {
case Some(user) =>
val principal = new AmIpPrincipal(user)
val requestWrapper = new HttpServletRequestWrapper(httpReq) {
override def getUserPrincipal: Principal = principal
override def getRemoteUser: String = principal.getName
override def isUserInRole(role: String): Boolean = false
}
val requestWrapper = new AmIpServletRequestWrapper(httpReq, principal)
chain.doFilter(requestWrapper, resp)
case None =>
logDebug(s"Could not find $PROXY_USER_COOKIE_NAME cookie, so user will not be set")
Expand All @@ -144,12 +136,12 @@ class YarnAMIpFilter extends Filter with Logging {
}

@throws[ServletException]
private def findRedirectUrl: String = {
private[spark] def findRedirectUrl: String = {
val addr = if (proxyUriBases.size == 1) {
// external proxy or not RM HA
Some(proxyUriBases.values.iterator.next)
Some(proxyUriBases.values.iterator.next())
} else if (rmUrls != null) {
rmUrls.find { url => isValidUrl(proxyUriBases(url)) }
rmUrls.map(url => proxyUriBases(url)).find { host => isValidUrl(host) }
} else {
None
}
Expand Down Expand Up @@ -245,5 +237,20 @@ private[spark] object YarnAMIpFilter {
val E_HTTP_HTTPS_ONLY = "This filter only works for HTTP/HTTPS"
val LOCATION = "Location"
// update the proxy IP list about every 5 min
val updateInterval = TimeUnit.MINUTES.toNanos(5)
var updateInterval = TimeUnit.MINUTES.toNanos(5)

// only for testing
def setUpdateInterval(ns: Long): Unit = updateInterval = ns

private[spark] class AmIpPrincipal(name: String) extends Principal {
override def getName: String = name
}

private[spark] class AmIpServletRequestWrapper(
httpReq: HttpServletRequest,
principal: AmIpPrincipal) extends HttpServletRequestWrapper(httpReq) {
override def getUserPrincipal: Principal = principal
override def getRemoteUser: String = principal.getName
override def isUserInRole(role: String): Boolean = false
}
}
Loading

0 comments on commit 050e3d9

Please sign in to comment.