Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2919] [SQL] Basic support for analyze command in HiveQl #1848

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ private[hive] case class AddFile(filePath: String) extends Command

private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command

private[hive] case class AnalyzeTable(tableName: String) extends Command

/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl {
protected val nativeCommands = Seq(
Expand Down Expand Up @@ -74,7 +76,6 @@ private[hive] object HiveQl {
"TOK_CREATEFUNCTION",
"TOK_DROPFUNCTION",

"TOK_ANALYZE",
"TOK_ALTERDATABASE_PROPERTIES",
"TOK_ALTERINDEX_PROPERTIES",
"TOK_ALTERINDEX_REBUILD",
Expand All @@ -92,7 +93,6 @@ private[hive] object HiveQl {
"TOK_ALTERTABLE_SKEWED",
"TOK_ALTERTABLE_TOUCH",
"TOK_ALTERTABLE_UNARCHIVE",
"TOK_ANALYZE",
"TOK_CREATEDATABASE",
"TOK_CREATEFUNCTION",
"TOK_CREATEINDEX",
Expand Down Expand Up @@ -239,7 +239,6 @@ private[hive] object HiveQl {
ShellCommand(sql.drop(1))
} else {
val tree = getAst(sql)

if (nativeCommands contains tree.getText) {
NativeCommand(sql)
} else {
Expand Down Expand Up @@ -387,6 +386,22 @@ private[hive] object HiveQl {
ifExists) =>
val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
DropTable(tableName, ifExists.nonEmpty)
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
case Token("TOK_ANALYZE",
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
isNoscan) =>
// Reference:
// https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
if (partitionSpec.nonEmpty) {
// Analyze partitions will be treated as a Hive native command.
NativePlaceholder
} else if (isNoscan.isEmpty) {
// If users do not specific "noscan", it will be treated as a Hive native command.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "specify"

NativePlaceholder
} else {
val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
AnalyzeTable(tableName)
}
// Just fake explain for any of the native commands.
case Token("TOK_EXPLAIN", explainArgs)
if noExplainCommands.contains(explainArgs.head.getText) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ private[hive] trait HiveStrategies {

case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil

case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil

case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,33 @@ import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.HiveContext

/**
* :: DeveloperApi ::
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't believe you are allowed to have a space here (it breaks our scaladoc hacks)

* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
*
* Right now, it only supports Hive tables and it only updates the size of a Hive table
* in the Hive metastore.
*/
@DeveloperApi
case class AnalyzeTable(tableName: String) extends LeafNode with Command {

def hiveContext = sqlContext.asInstanceOf[HiveContext]

def output = Seq.empty

override protected[sql] lazy val sideEffectResult = {
hiveContext.analyze(tableName)
Seq.empty[Any]
}

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
}
}

/**
* :: DeveloperApi ::
* Drops a table from the metastore and removes it if it is cached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,54 @@ package org.apache.spark.sql.hive

import scala.reflect.ClassTag


import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._

class StatisticsSuite extends QueryTest {

test("parse analyze commands") {
def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
val parsed = HiveQl.parseSql(analyzeCommand)
val operators = parsed.collect {
case a: AnalyzeTable => a
case o => o
}

assert(operators.size === 1)
if (operators(0).getClass() != c) {
fail(
s"""$analyzeCommand expected command: $c, but got ${operators(0)}
|parsed command:
|$parsed
""".stripMargin)
}
}

assertAnalyzeCommand(
"ANALYZE TABLE Table1 COMPUTE STATISTICS",
classOf[NativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
classOf[NativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
classOf[NativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS",
classOf[NativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan",
classOf[NativeCommand])

assertAnalyzeCommand(
"ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn",
classOf[AnalyzeTable])
}

test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
catalog.lookupRelation(None, tableName).statistics.sizeInBytes
Expand All @@ -37,7 +78,7 @@ class StatisticsSuite extends QueryTest {

assert(queryTotalSize("analyzeTable") === defaultSizeInBytes)

analyze("analyzeTable")
sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan")

assert(queryTotalSize("analyzeTable") === BigInt(11624))

Expand Down Expand Up @@ -66,7 +107,7 @@ class StatisticsSuite extends QueryTest {

assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes)

analyze("analyzeTable_part")
sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")

assert(queryTotalSize("analyzeTable_part") === BigInt(17436))

Expand Down