Skip to content

Commit

Permalink
fix some bug for hive-0.12
Browse files Browse the repository at this point in the history
  • Loading branch information
tianyi committed Jan 19, 2015
1 parent 3cca2fb commit 703e331
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
private def generateSQLStatsTable(): Seq[Node] = {
val numBatches = listener.executeList.size
val table = if (numBatches > 0) {
val headerRow = Seq("User", "JobID", "Start Time", "Finish Time", "Duration",
val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
"Statement", "State", "Detail")
val dataRows = listener.executeList.values.toSeq.sortBy(_.startTimestamp).reverse

Expand All @@ -70,10 +70,11 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
.format(UIUtils.prependBaseUri(parent.basePath), info.jobId)
val detail = if(info.state == ExecutionState.FAILED) info.detail else info.executePlan
<tr>
<td>{info.session.getUsername}</td>
<td>{info.userName}</td>
<td>
<a href={detailUrl}>{info.jobId}</a>
</td>
<td>{info.groupId}</td>
<td>{formatDate(info.startTimestamp)}</td>
<td>{formatDate(info.finishTimestamp)}</td>
<td>{formatDurationOption(Some(info.totalTime))}</td>
Expand Down Expand Up @@ -131,15 +132,16 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
val dataRows =
listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map(session =>{
Seq(
session.session.getUsername,
session.userName,
session.ip,
session.sessionID,
formatDate(session.startTimestamp),
formatDate(session.finishTimestamp),
formatDurationOption(Some(session.totalTime)),
session.totalExecute.toString
)
}).toSeq
val headerRow = Seq("User", "Session ID", "Start Time", "Finish Time", "Duration",
val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
"Total Execute")
Some(listingTable(headerRow, dataRows))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[thriftserver] trait ThriftServerEventListener {
/**
* Called when a session created.
*/
def onSessionCreated(session: HiveSession) { }
def onSessionCreated(ip: String, session: HiveSession) { }

/**
* Called when a session closed.
Expand All @@ -42,7 +42,7 @@ private[thriftserver] trait ThriftServerEventListener {
/**
* Called when a statement completed compilation.
*/
def onStatementParse(id: String, executePlan: String, groupId: String) { }
def onStatementParse(id: String, executePlan: String) { }
/**
* Called when a statement got a error during running.
*/
Expand All @@ -53,8 +53,12 @@ private[thriftserver] trait ThriftServerEventListener {
def onStatementFinish(id: String) { }
}

private[thriftserver] class SessionInfo(val session: HiveSession, val startTimestamp: Long) {
private[thriftserver] class SessionInfo(
val session: HiveSession,
val startTimestamp: Long,
val ip: String) {
val sessionID = session.getSessionHandle.getSessionId.toString
val userName = if(session.getUserName == null) "UNKNOWN" else session.getUserName
var finishTimestamp = 0L
var totalExecute = 0

Expand All @@ -76,12 +80,13 @@ private[thriftserver] class ExecutionInfo(
val statement: String,
val session: HiveSession,
val startTimestamp: Long) {
val userName = if(session.getUserName == null) "UNKNOWN" else session.getUserName
var finishTimestamp = 0L
var executePlan = ""
var detail = ""
var state: ExecutionState.Value = ExecutionState.STARTED
var groupId = ""
var jobId = ""
var groupId = ""
def totalTime = {
if (finishTimestamp == 0L) {
System.currentTimeMillis() - startTimestamp
Expand All @@ -107,25 +112,26 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf)
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
val jobGroup = for (
props <- Option(jobStart.properties);
group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
) yield group
statement <- Option(props.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
) yield statement

jobGroup match {
case Some(groupId: String) => {
case Some(statement: String) => {
val ret = executeList.find( _ match {
case (id: String, info: ExecutionInfo) => {
info.groupId == groupId
info.statement == statement
}
})
if(ret.isDefined) {
ret.get._2.jobId = jobStart.jobId.toString
ret.get._2.groupId = jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID,"")
}
}
}
}

override def onSessionCreated(session: HiveSession): Unit = {
val info = new SessionInfo(session, System.currentTimeMillis())
override def onSessionCreated(ip: String, session: HiveSession): Unit = {
val info = new SessionInfo(session, System.currentTimeMillis(), ip)
sessionList(session.getSessionHandle) = info
trimSessionIfNecessary()
}
Expand All @@ -143,9 +149,8 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf)
totalRunning += 1
}

override def onStatementParse(id: String, executePlan: String, groupId: String): Unit = {
override def onStatementParse(id: String, executePlan: String): Unit = {
executeList(id).executePlan = executePlan
executeList(id).groupId = groupId
executeList(id).state = ExecutionState.COMPILED
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive.thriftserver
import java.sql.{Date, Timestamp}
import java.util.{ArrayList => JArrayList, Map => JMap, UUID}

import org.apache.hive.service.cli.thrift.TProtocolVersion

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}

Expand Down Expand Up @@ -76,7 +74,7 @@ private[hive] class SparkSQLSessionManagerShim(hiveContext: HiveContext) extends
delegationToken: java.lang.String): SessionHandle = {
val ret = super.openSession(username, password,
sessionConf, withImpersonation, delegationToken)
SparkSQLEnv.sqlEventListener.onSessionCreated(super.getSession(ret))
SparkSQLEnv.sqlEventListener.onSessionCreated("UNKNOWN", super.getSession(ret))
ret
}
}
Expand Down Expand Up @@ -218,7 +216,7 @@ private[hive] class SparkExecuteStatementOperation(
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString(), groupId)
SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ private[hive] class SparkSQLSessionManagerShim(
delegationToken: java.lang.String): SessionHandle = {
val ret = super.openSession(protocol, username, password,
sessionConf, withImpersonation, delegationToken)
SparkSQLEnv.sqlEventListener.onSessionCreated(super.getSession(ret))
val session = super.getSession(ret)
SparkSQLEnv.sqlEventListener.onSessionCreated("UNKNOWN", session)
ret
}
}
Expand Down Expand Up @@ -192,7 +193,7 @@ private[hive] class SparkExecuteStatementOperation(
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString(), groupId)
SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
Expand Down

0 comments on commit 703e331

Please sign in to comment.