From ea99768afe0990d32383fc7a1039a27793db678e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 9 Mar 2016 14:22:48 -0800 Subject: [PATCH 1/4] Add spark.ui.threads to set the max number of the UI threads --- .../main/scala/org/apache/spark/ui/JettyUtils.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 6b3601250a654..b813cce09a025 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.xml.Node -import org.eclipse.jetty.server.{Connector, Request, Server} +import org.eclipse.jetty.server.{AbstractConnector, Connector, Request, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.server.nio.SelectChannelConnector import org.eclipse.jetty.server.ssl.SslSelectChannelConnector @@ -270,9 +270,19 @@ private[spark] object JettyUtils extends Logging { gzipHandlers.foreach(collection.addHandler) connectors.foreach(_.setHost(hostName)) + // As each Acceptor will use one thread, the number of threads should at least be the number + // of acceptors plus 1. (See SPARK-13776) + var minThreads = 1 + connectors.collect { case c: AbstractConnector => c }.foreach { c => + // Limit the max acceptor number to 8 so that we don't waste a lot of threads + c.setAcceptors(Math.min(c.getAcceptors, 8)) + minThreads += c.getAcceptors + } server.setConnectors(connectors.toArray) val pool = new QueuedThreadPool + pool.setMaxThreads(Math.max(pool.getMaxThreads, minThreads)) + pool.setMinThreads(Math.min(pool.getMinThreads, pool.getMaxThreads)) pool.setDaemon(true) server.setThreadPool(pool) val errorHandler = new ErrorHandler() From 8422f6e1e5c8617dd8bffaa6c47429aadfc7bc10 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 10 Mar 2016 16:21:30 -0800 Subject: [PATCH 2/4] Consider the number of selector too --- .../org/apache/spark/ui/JettyUtils.scala | 20 ++++++++++++------- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b813cce09a025..596c2d02445cc 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -270,19 +270,25 @@ private[spark] object JettyUtils extends Logging { gzipHandlers.foreach(collection.addHandler) connectors.foreach(_.setHost(hostName)) - // As each Acceptor will use one thread, the number of threads should at least be the number - // of acceptors plus 1. (See SPARK-13776) + // As each acceptor and each selector will use one thread, the number of threads should at + // least be the number of acceptors and selectors plus 1. (See SPARK-13776) var minThreads = 1 - connectors.collect { case c: AbstractConnector => c }.foreach { c => + connectors.foreach { c => + // Currently we only use "SelectChannelConnector" + val connector = c.asInstanceOf[SelectChannelConnector] // Limit the max acceptor number to 8 so that we don't waste a lot of threads - c.setAcceptors(Math.min(c.getAcceptors, 8)) - minThreads += c.getAcceptors + connector.setAcceptors(math.min(connector.getAcceptors, 8)) + // The number of selectors always equals to the number of acceptors + minThreads += connector.getAcceptors * 2 } server.setConnectors(connectors.toArray) val pool = new QueuedThreadPool - pool.setMaxThreads(Math.max(pool.getMaxThreads, minThreads)) - pool.setMinThreads(Math.min(pool.getMinThreads, pool.getMaxThreads)) + if (serverName.nonEmpty) { + pool.setName(serverName) + } + pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) + pool.setMinThreads(math.min(pool.getMinThreads, pool.getMaxThreads)) pool.setDaemon(true) server.setThreadPool(pool) val errorHandler = new ErrorHandler() diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 23d312525f2be..19dd6a10f3508 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -134,7 +134,7 @@ private[spark] abstract class WebUI( def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { - var host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") + val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) logInfo("Bound %s to %s, and started at http://%s:%d".format(className, host, publicHostName, boundPort)) From e772aed2f7e419a29890f924f98a76bbc90cfe54 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 11 Mar 2016 13:06:16 -0800 Subject: [PATCH 3/4] Remove 'setMinThreads' --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 596c2d02445cc..1c4a332550064 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -288,7 +288,6 @@ private[spark] object JettyUtils extends Logging { pool.setName(serverName) } pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) - pool.setMinThreads(math.min(pool.getMinThreads, pool.getMaxThreads)) pool.setDaemon(true) server.setThreadPool(pool) val errorHandler = new ErrorHandler() From 068887b64ce9be531814b7c4f385dd24dce80c20 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 15 Mar 2016 13:21:37 -0700 Subject: [PATCH 4/4] Revert an irrelevant change --- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 19dd6a10f3508..23d312525f2be 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -134,7 +134,7 @@ private[spark] abstract class WebUI( def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { - val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") + var host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) logInfo("Bound %s to %s, and started at http://%s:%d".format(className, host, publicHostName, boundPort))