From fc45425bc80a039994e57d2d98c0a6f281427654 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 8 Jan 2016 16:59:02 +0900 Subject: [PATCH 1/4] [SPARK-12708][UI] Sorting task error in Stages Page when yarn mode --- .../org/apache/spark/ui/jobs/StagePage.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 2cc6c75a9ac12..3f09887277186 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -18,6 +18,7 @@ package org.apache.spark.ui.jobs import java.net.URLEncoder +import java.net.URLDecoder import java.util.Date import javax.servlet.http.HttpServletRequest @@ -100,7 +101,19 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val parameterTaskPrevPageSize = request.getParameter("task.prevPageSize") val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1) - val taskSortColumn = Option(parameterTaskSortColumn).getOrElse("Index") + val taskSortColumn = Option(parameterTaskSortColumn).map { + sortColumn => + // If sortColumn contains "/", `getParameter("task.sort")` will return + // "%252F" when yarn mode. we need additional decode. + // See also SPARK-4313, YARN-2844. + var column = sortColumn + var decodedColumn = URLDecoder.decode(column, "UTF-8") + while (column != decodedColumn) { + column = decodedColumn + decodedColumn = URLDecoder.decode(column, "UTF-8") + } + column + }.getOrElse("Index") val taskSortDesc = Option(parameterTaskSortDesc).map(_.toBoolean).getOrElse(false) val taskPageSize = Option(parameterTaskPageSize).map(_.toInt).getOrElse(100) val taskPrevPageSize = Option(parameterTaskPrevPageSize).map(_.toInt).getOrElse(taskPageSize) From 62c04ca239d160d24bfb55dee6d2e827b8183f9a Mon Sep 17 00:00:00 2001 From: root Date: Fri, 8 Jan 2016 21:19:20 +0900 Subject: [PATCH 2/4] fixed PR comments --- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 3f09887277186..fcd10a3bba20b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -101,11 +101,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val parameterTaskPrevPageSize = request.getParameter("task.prevPageSize") val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1) - val taskSortColumn = Option(parameterTaskSortColumn).map { - sortColumn => - // If sortColumn contains "/", `getParameter("task.sort")` will return - // "%252F" when yarn mode. we need additional decode. - // See also SPARK-4313, YARN-2844. + val taskSortColumn = Option(parameterTaskSortColumn).map { sortColumn => + // SPARK-12708 + // Due to YARN-2844, "/" in the url will be encoded to "%252F" when + // running in yarn mode. `request.getParameter("task.sort")` will return + // "%252F". Therefore we need to decode it until we get the real column name. + // SPARK-4313 is similar to this issue. var column = sortColumn var decodedColumn = URLDecoder.decode(column, "UTF-8") while (column != decodedColumn) { From 3d508c6f6c69c2b2e4b29f8b9edf7048694e7f23 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 13 Jan 2016 22:06:20 +0900 Subject: [PATCH 3/4] Fix PR comments and similar issue in PoolPage. --- .../main/scala/org/apache/spark/ui/UIUtils.scala | 16 ++++++++++++++++ .../spark/ui/exec/ExecutorThreadDumpPage.scala | 15 +-------------- .../org/apache/spark/ui/jobs/PoolPage.scala | 2 +- .../org/apache/spark/ui/jobs/PoolTable.scala | 5 ++++- .../org/apache/spark/ui/jobs/StagePage.scala | 14 +------------- .../scala/org/apache/spark/ui/UIUtilsSuite.scala | 14 ++++++++++++++ 6 files changed, 37 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 81a6f07ec836a..1949c4b3cbf42 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.ui +import java.net.URLDecoder import java.text.SimpleDateFormat import java.util.{Date, Locale} @@ -451,4 +452,19 @@ private[spark] object UIUtils extends Logging { {desc} } } + + /** + * Decode URLParameter if URL is encoded by YARN-WebAppProxyServlet. + * Due to YARN-2844: WebAppProxyServlet cannot handle urls which contain encoded characters + * Therefore we need to decode it until we get the real URLParameter. + */ + def decodeURLParameter(urlParam: String): String = { + var param = urlParam + var decodedParam = URLDecoder.decode(param, "UTF-8") + while (param != decodedParam) { + param = decodedParam + decodedParam = URLDecoder.decode(param, "UTF-8") + } + param + } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index 1a6f0fdd50df7..5c0876530c58b 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -17,7 +17,6 @@ package org.apache.spark.ui.exec -import java.net.URLDecoder import javax.servlet.http.HttpServletRequest import scala.util.Try @@ -30,19 +29,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage private val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { - val executorId = Option(request.getParameter("executorId")).map { - executorId => - // Due to YARN-2844, "" in the url will be encoded to "%25253Cdriver%25253E" when - // running in yarn-cluster mode. `request.getParameter("executorId")` will return - // "%253Cdriver%253E". Therefore we need to decode it until we get the real id. - var id = executorId - var decodedId = URLDecoder.decode(id, "UTF-8") - while (id != decodedId) { - id = decodedId - decodedId = URLDecoder.decode(id, "UTF-8") - } - id - }.getOrElse { + val executorId = Option(request.getParameter("executorId")).getOrElse { throw new IllegalArgumentException(s"Missing executorId parameter") } val time = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index fa30f2bda4272..757df6b68fd7f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -31,7 +31,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { - val poolName = request.getParameter("poolname") + val poolName = UIUtils.decodeURLParameter(request.getParameter("poolname")) require(poolName != null && poolName.nonEmpty, "Missing poolname parameter") val poolToActiveStages = listener.poolToActiveStages diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 9ba2af54dacf4..6965ce1254c18 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui.jobs +import java.net.URLEncoder + import scala.collection.mutable.HashMap import scala.xml.Node @@ -59,7 +61,8 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) { case None => 0 } val href = "%s/stages/pool?poolname=%s" - .format(UIUtils.prependBaseUri(parent.basePath), p.name) + .format(UIUtils.prependBaseUri(parent.basePath), + URLEncoder.encode(p.name, "UTF-8")) {p.name} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index fcd10a3bba20b..6d4066a870cdd 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -18,7 +18,6 @@ package org.apache.spark.ui.jobs import java.net.URLEncoder -import java.net.URLDecoder import java.util.Date import javax.servlet.http.HttpServletRequest @@ -102,18 +101,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1) val taskSortColumn = Option(parameterTaskSortColumn).map { sortColumn => - // SPARK-12708 - // Due to YARN-2844, "/" in the url will be encoded to "%252F" when - // running in yarn mode. `request.getParameter("task.sort")` will return - // "%252F". Therefore we need to decode it until we get the real column name. - // SPARK-4313 is similar to this issue. - var column = sortColumn - var decodedColumn = URLDecoder.decode(column, "UTF-8") - while (column != decodedColumn) { - column = decodedColumn - decodedColumn = URLDecoder.decode(column, "UTF-8") - } - column + UIUtils.decodeURLParameter(sortColumn) }.getOrElse("Index") val taskSortDesc = Option(parameterTaskSortDesc).map(_.toBoolean).getOrElse(false) val taskPageSize = Option(parameterTaskPageSize).map(_.toInt).getOrElse(100) diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala index dd8d5ec27f87e..7e532fd791da1 100644 --- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala @@ -67,6 +67,20 @@ class UIUtilsSuite extends SparkFunSuite { s"\nRunning progress bar should round down\n\nExpected:\n$expected\nGenerated:\n$generated") } + test("decodeURLParameter (SPARK-12708: Sorting task error in Stages Page when yarn mode.)") { + val encoded1 = "%252F" + val decoded1 = "/" + val encoded2 = "%253Cdriver%253E" + val decoded2 = "" + + assert(decoded1.sameElements(decodeURLParameter(encoded1))) + assert(decoded2.sameElements(decodeURLParameter(encoded2))) + + // verify that no affect to decoded URL. + assert(decoded1.sameElements(decodeURLParameter(decoded1))) + assert(decoded2.sameElements(decodeURLParameter(decoded2))) + } + private def verify( desc: String, expected: Elem, errorMsg: String = "", baseUrl: String = ""): Unit = { val generated = makeDescription(desc, baseUrl) From 384b990e237d6c0b311042301ed0252dc689569f Mon Sep 17 00:00:00 2001 From: Koyo Yoshida Date: Fri, 15 Jan 2016 04:25:08 +0900 Subject: [PATCH 4/4] Fixed PR comments. --- .../apache/spark/ui/exec/ExecutorThreadDumpPage.scala | 4 +++- .../scala/org/apache/spark/ui/jobs/PoolPage.scala | 11 ++++++++--- .../scala/org/apache/spark/ui/jobs/PoolTable.scala | 3 +-- .../test/scala/org/apache/spark/ui/UIUtilsSuite.scala | 8 ++++---- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index 5c0876530c58b..edc66709e229a 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -29,7 +29,9 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage private val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { - val executorId = Option(request.getParameter("executorId")).getOrElse { + val executorId = Option(request.getParameter("executorId")).map { executorId => + UIUtils.decodeURLParameter(executorId) + }.getOrElse { throw new IllegalArgumentException(s"Missing executorId parameter") } val time = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 757df6b68fd7f..6cd25919ca5fd 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -31,8 +31,11 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { - val poolName = UIUtils.decodeURLParameter(request.getParameter("poolname")) - require(poolName != null && poolName.nonEmpty, "Missing poolname parameter") + val poolName = Option(request.getParameter("poolname")).map { poolname => + UIUtils.decodeURLParameter(poolname) + }.getOrElse { + throw new IllegalArgumentException(s"Missing poolname parameter") + } val poolToActiveStages = listener.poolToActiveStages val activeStages = poolToActiveStages.get(poolName) match { @@ -44,7 +47,9 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { killEnabled = parent.killEnabled) // For now, pool information is only accessible in live UIs - val pools = sc.map(_.getPoolForName(poolName).get).toSeq + val pools = sc.map(_.getPoolForName(poolName).getOrElse { + throw new IllegalArgumentException(s"Unknown poolname: $poolName") + }).toSeq val poolTable = new PoolTable(pools, parent) val content = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 6965ce1254c18..ea02968733cac 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -61,8 +61,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) { case None => 0 } val href = "%s/stages/pool?poolname=%s" - .format(UIUtils.prependBaseUri(parent.basePath), - URLEncoder.encode(p.name, "UTF-8")) + .format(UIUtils.prependBaseUri(parent.basePath), URLEncoder.encode(p.name, "UTF-8")) {p.name} diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala index 7e532fd791da1..bc8a5d494dbd3 100644 --- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala @@ -73,12 +73,12 @@ class UIUtilsSuite extends SparkFunSuite { val encoded2 = "%253Cdriver%253E" val decoded2 = "" - assert(decoded1.sameElements(decodeURLParameter(encoded1))) - assert(decoded2.sameElements(decodeURLParameter(encoded2))) + assert(decoded1 === decodeURLParameter(encoded1)) + assert(decoded2 === decodeURLParameter(encoded2)) // verify that no affect to decoded URL. - assert(decoded1.sameElements(decodeURLParameter(decoded1))) - assert(decoded2.sameElements(decodeURLParameter(decoded2))) + assert(decoded1 === decodeURLParameter(decoded1)) + assert(decoded2 === decodeURLParameter(decoded2)) } private def verify(