Skip to content

Commit

Permalink
fix some issues
Browse files Browse the repository at this point in the history
  • Loading branch information
tianyi committed Jan 19, 2015
1 parent 703e331 commit daed3d1
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext)

private[hive] val uiTab: Option[ThriftServerTab] =
if (hiveContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
Some(new ThriftServerTab())
Some(new ThriftServerTab(hiveContext.sparkContext))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import java.util.concurrent.Executors
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.cli.thrift.TProtocolVersion

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
import org.apache.hive.service.cli.SessionHandle

private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
extends SparkSQLSessionManagerShim(hiveContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
val content =
generateBasicStats() ++ <br></br> ++
<h4>Total {listener.sessionList.size} session online,
Total {listener.totalRunning} sql running</h4> ++
generateSessionStatsTable() ++ generateSQLStatsTable()
generateBasicStats() ++
<br/> ++
<h4>
Total {listener.sessionList.size} session online,
Total {listener.totalRunning} sql running
</h4> ++
generateSessionStatsTable() ++
generateSQLStatsTable()
UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000))
}

Expand All @@ -59,8 +63,8 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""

/** Generate stats of batch statements of the thrift server program */
private def generateSQLStatsTable(): Seq[Node] = {
val numBatches = listener.executeList.size
val table = if (numBatches > 0) {
val numStatement = listener.executeList.size
val table = if (numStatement > 0) {
val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
"Statement", "State", "Detail")
val dataRows = listener.executeList.values.toSeq.sortBy(_.startTimestamp).reverse
Expand Down Expand Up @@ -113,11 +117,11 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
+ details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{errorMessage}</pre>
</div>
<div class="stacktrace-details collapsed">
<pre>{errorMessage}</pre>
</div>
// scalastyle:on
} else {
""
Expand All @@ -130,7 +134,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
val numBatches = listener.sessionList.size
val table = if (numBatches > 0) {
val dataRows =
listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map(session =>{
listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map ( session =>
Seq(
session.userName,
session.ip,
Expand All @@ -140,7 +144,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
formatDurationOption(Some(session.totalTime)),
session.totalExecute.toString
)
}).toSeq
).toSeq
val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
"Total Execute")
Some(listingTable(headerRow, dataRows))
Expand All @@ -150,11 +154,11 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""

val content =
<h5>Session Statistics</h5> ++
<div>
<ul class="unstyled">
{table.getOrElse("No statistics have been generated yet.")}
</ul>
</div>
<div>
<ul class="unstyled">
{table.getOrElse("No statistics have been generated yet.")}
</ul>
</div>

content
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@ package org.apache.spark.sql.hive.thriftserver.ui
import org.apache.spark.sql.hive.thriftserver.SparkSQLEnv
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab._
import org.apache.spark.ui.{SparkUI, SparkUITab}
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.{SparkContext, Logging, SparkException}

/**
* Spark Web UI tab that shows statistics of a streaming job.
* This assumes the given SparkContext has enabled its SparkUI.
*/
private[thriftserver] class ThriftServerTab()
extends SparkUITab(getSparkUI(), "ThriftServer") with Logging {
private[thriftserver] class ThriftServerTab(sparkContext: SparkContext)
extends SparkUITab(getSparkUI(sparkContext), "ThriftServer") with Logging {

val parent = getSparkUI()
val parent = getSparkUI(sparkContext)
val listener = SparkSQLEnv.sqlEventListener

attachPage(new ThriftServerPage(this))
parent.attachTab(this)
}

private[thriftserver] object ThriftServerTab {
def getSparkUI(): SparkUI = {
SparkSQLEnv.sparkContext.ui.getOrElse {
def getSparkUI(sparkContext: SparkContext): SparkUI = {
sparkContext.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ private[thriftserver] trait ThriftServerEventListener {
/**
* Called when a statement started to run.
*/
def onStatementStart(id: String, session: HiveSession, statement: String) { }
def onStatementStart(id: String, session: HiveSession, statement: String, groupId: String) { }

/**
* Called when a statement completed compilation.
*/
def onStatementParse(id: String, executePlan: String) { }

/**
* Called when a statement got a error during running.
*/
def onStatementError(id: String, errorMessage: String, errorTrace: String) { }

/**
* Called when a statement ran success.
*/
Expand All @@ -58,7 +61,7 @@ private[thriftserver] class SessionInfo(
val startTimestamp: Long,
val ip: String) {
val sessionID = session.getSessionHandle.getSessionId.toString
val userName = if(session.getUserName == null) "UNKNOWN" else session.getUserName
val userName = if (session.getUserName == null) "UNKNOWN" else session.getUserName
var finishTimestamp = 0L
var totalExecute = 0

Expand Down Expand Up @@ -112,19 +115,19 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf)
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
val jobGroup = for (
props <- Option(jobStart.properties);
statement <- Option(props.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
statement <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
) yield statement

jobGroup match {
case Some(statement: String) => {
case Some(groupId: String) => {
val ret = executeList.find( _ match {
case (id: String, info: ExecutionInfo) => {
info.statement == statement
info.jobId == "" && info.groupId == groupId
}
})
if(ret.isDefined) {
ret.get._2.jobId = jobStart.jobId.toString
ret.get._2.groupId = jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID,"")
ret.get._2.groupId = groupId
}
}
}
Expand All @@ -140,12 +143,14 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf)
sessionList(session.getSessionHandle).finishTimestamp = System.currentTimeMillis()
}

override def onStatementStart(id: String, session: HiveSession, statement: String): Unit = {
override def onStatementStart(id: String, session: HiveSession,
statement: String, groupId:String): Unit = {
val info = new ExecutionInfo(statement, session, System.currentTimeMillis())
info.state = ExecutionState.STARTED
executeList(id) = info
trimExecutionIfNecessary()
sessionList(session.getSessionHandle).totalExecute += 1
executeList(id).groupId = groupId
totalRunning += 1
}

Expand All @@ -170,28 +175,16 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf)
private def trimExecutionIfNecessary() = synchronized {
if (executeList.size > retainedStatements) {
val toRemove = math.max(retainedStatements / 10, 1)
executeList.toList.sortWith(compareExecutionDesc).take(toRemove).foreach { s =>
executeList.toList.sortBy(_._2.startTimestamp).take(toRemove).foreach { s =>
executeList.remove(s._1)
}
}
}

private def compareExecutionDesc(
l: (String, ExecutionInfo),
r: (String, ExecutionInfo)): Boolean = {
l._2.startTimestamp < r._2.startTimestamp
}

private def compareSessionDesc(
l: (SessionHandle, SessionInfo),
r: (SessionHandle, SessionInfo)): Boolean = {
l._2.startTimestamp < r._2.startTimestamp
}

private def trimSessionIfNecessary() = synchronized {
if (sessionList.size > retainedSessions) {
val toRemove = math.max(retainedSessions / 10, 1)
sessionList.toList.sortWith(compareSessionDesc).take(toRemove).foreach { s =>
sessionList.toList.sortBy(_._2.startTimestamp).take(toRemove).foreach { s =>
sessionList.remove(s._1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.{SessionManager, HiveSession}

import org.apache.spark.Logging
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
Expand Down Expand Up @@ -202,7 +203,14 @@ private[hive] class SparkExecuteStatementOperation(
val sid = UUID.randomUUID().toString
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement)
val group = hiveContext.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) match {
case groupId: String =>
hiveContext.sparkContext.setJobDescription(statement)
groupId
case _ => hiveContext.sparkContext.setJobGroup(sid, statement)
sid
}
SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement, group)
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
Expand All @@ -212,7 +220,6 @@ private[hive] class SparkExecuteStatementOperation(
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
hiveContext.sparkContext.setJobDescription(statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session._

import org.apache.spark.Logging
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
Expand Down Expand Up @@ -179,7 +179,14 @@ private[hive] class SparkExecuteStatementOperation(
val sid = UUID.randomUUID().toString
logInfo(s"Running query '$statement'")
setState(OperationState.RUNNING)
SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement)
val group = hiveContext.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) match {
case groupId: String =>
hiveContext.sparkContext.setJobDescription(statement)
groupId
case _ => hiveContext.sparkContext.setJobGroup(sid, statement)
sid
}
SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement, group)
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
Expand All @@ -189,7 +196,6 @@ private[hive] class SparkExecuteStatementOperation(
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
hiveContext.sparkContext.setJobDescription(statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
Expand Down

0 comments on commit daed3d1

Please sign in to comment.