Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13139][SQL] Parse Hive DDL commands ourselves #11573

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.parser

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types._
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.sql.types._
object ParserUtils {

object Token {
// Match on (text, children)
def unapply(node: ASTNode): Some[(String, List[ASTNode])] = {
CurrentOrigin.setPosition(node.line, node.positionInLine)
node.pattern
Expand Down Expand Up @@ -160,7 +162,14 @@ object ParserUtils {
}

/**
* Throw an exception because we cannot parse the given node.
* Throw an exception because we cannot parse the given node for some unexpected reason.
*/
def parseFailed(msg: String, node: ASTNode): Nothing = {
throw new AnalysisException(s"$msg: '${node.source}")
}

/**
* Throw an exception because there are no rules to parse the node.
*/
def noParseRule(msg: String, node: ASTNode): Nothing = {
throw new NotImplementedError(
Expand Down
9 changes: 9 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,15 @@ class SQLContext private[sql](
Dataset.newDataFrame(this, parseSql(sqlText))
}

/**
* Executes a SQL query without parsing it, but instead passing it directly to an underlying
* system to process. This is currently only used for Hive DDLs and will be removed as soon
* as Spark can parse all supported Hive DDLs itself.
*/
private[sql] def runNativeSql(sqlText: String): Seq[Row] = {
throw new UnsupportedOperationException
}

/**
* Returns the specified table as a [[DataFrame]].
*
Expand Down
125 changes: 106 additions & 19 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,26 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
import ParserUtils._

/** Check if a command should not be explained. */
protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command
protected def isNoExplainCommand(command: String): Boolean = {
"TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command
}

/**
* For each node, extract properties in the form of a list ['key1', 'key2', 'key3', 'value']
* into a pair (key1.key2.key3, value).
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is good to provide a concrete example as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what is the difference between this method and extractTableProps defined for alter table?

private def extractProps(
props: Seq[ASTNode],
expectedNodeText: String): Seq[(String, String)] = {
props.map {
case Token(x, keysAndValue) if x == expectedNodeText =>
val key = keysAndValue.init.map { x => unquoteString(x.text) }.mkString(".")
val value = unquoteString(keysAndValue.last.text)
(key, value)
case p =>
parseFailed(s"Expected property '$expectedNodeText' in command", p)
}
}

protected override def nodeToPlan(node: ASTNode): LogicalPlan = {
node match {
Expand Down Expand Up @@ -64,10 +83,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
val tableIdent = extractTableIdent(nameParts)
RefreshTable(tableIdent)

// CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
// [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)];
case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) =>
val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq(
"TOK_IFNOTEXISTS",
"TOK_DATABASELOCATION",
"TOK_DATABASECOMMENT",
"TOK_DATABASEPROPERTIES"), args)
val location = dbLocation.map {
case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc)
case _ => parseFailed("Invalid CREATE DATABASE command", node)
}
val comment = databaseComment.map {
case Token("TOK_DATABASECOMMENT", Token(com, Nil) :: Nil) => unquoteString(com)
case _ => parseFailed("Invalid CREATE DATABASE command", node)
}
val props = dbprops.toSeq.flatMap {
case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) =>
extractProps(propList, "TOK_TABLEPROPERTY")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it will be also good to provide an example format like what we have for TOK_CREATEFUNCTION?

case _ => parseFailed("Invalid CREATE DATABASE command", node)
}.toMap
CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source)

// CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
// [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
case Token("TOK_CREATEFUNCTION", args) =>
// Example format:
//
// TOK_CREATEFUNCTION
// :- db_name
// :- func_name
// :- alias
// +- TOK_RESOURCE_LIST
// :- TOK_RESOURCE_URI
// : :- TOK_JAR
// : +- '/path/to/jar'
// +- TOK_RESOURCE_URI
// :- TOK_FILE
// +- 'path/to/file'
val (funcNameArgs, otherArgs) = args.partition {
case Token("TOK_RESOURCE_LIST", _) => false
case Token("TOK_TEMPORARY", _) => false
case Token(_, Nil) => true
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
}
// If database name is specified, there are 3 tokens, otherwise 2.
val (funcName, alias) = funcNameArgs match {
case Token(dbName, Nil) :: Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
(unquoteString(dbName) + "." + unquoteString(fname), unquoteString(aname))
case Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
(unquoteString(fname), unquoteString(aname))
case _ =>
parseFailed("Invalid CREATE FUNCTION command", node)
}
// Extract other keywords, if they exist
val Seq(rList, temp) = getClauses(Seq("TOK_RESOURCE_LIST", "TOK_TEMPORARY"), otherArgs)
val resourcesMap = rList.toSeq.flatMap {
case Token("TOK_RESOURCE_LIST", resources) =>
resources.map {
case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) =>
val resourceType = rType match {
case Token("TOK_JAR", Nil) => "jar"
case Token("TOK_FILE", Nil) => "file"
case Token("TOK_ARCHIVE", Nil) => "archive"
case Token(f, _) => parseFailed(s"Unexpected resource format '$f'", node)
}
(resourceType, unquoteString(rPath))
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
}
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
}.toMap
CreateFunction(funcName, alias, resourcesMap, temp.isDefined)(node.source)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will this map look like if I have USING JAR 'jar1', JAR 'jar2', ...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also have a test for this case.


case Token("TOK_ALTERTABLE", alterTableArgs) =>
AlterTableCommandParser.parse(node)

case Token("TOK_CREATETABLEUSING", createTableArgs) =>
val Seq(
temp,
allowExisting,
ifNotExists,
Some(tabName),
tableCols,
Some(Token("TOK_TABLEPROVIDER", providerNameParts)),
Expand All @@ -79,30 +174,22 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
"TOK_TABLEPROVIDER",
"TOK_TABLEOPTIONS",
"TOK_QUERY"), createTableArgs)

val tableIdent: TableIdentifier = extractTableIdent(tabName)

val columns = tableCols.map {
case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField))
case _ => parseFailed("Invalid CREATE TABLE command", node)
}

val provider = providerNameParts.map {
case Token(name, Nil) => name
case _ => parseFailed("Invalid CREATE TABLE command", node)
}.mkString(".")

val options: Map[String, String] = tableOpts.toSeq.flatMap {
case Token("TOK_TABLEOPTIONS", options) =>
options.map {
case Token("TOK_TABLEOPTION", keysAndValue) =>
val key = keysAndValue.init.map(_.text).mkString(".")
val value = unquoteString(keysAndValue.last.text)
(key, value)
}
val options = tableOpts.toSeq.flatMap {
case Token("TOK_TABLEOPTIONS", opts) => extractProps(opts, "TOK_TABLEOPTION")
case _ => parseFailed("Invalid CREATE TABLE command", node)
}.toMap
val asClause = tableAs.map(nodeToPlan)

val asClause = tableAs.map(nodeToPlan(_))

if (temp.isDefined && allowExisting.isDefined) {
if (temp.isDefined && ifNotExists.isDefined) {
throw new AnalysisException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}
Expand All @@ -113,7 +200,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}

val mode = if (allowExisting.isDefined) {
val mode = if (ifNotExists.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
Expand All @@ -136,7 +223,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
provider,
temp.isDefined,
options,
allowExisting.isDefined,
ifNotExists.isDefined,
managedIfNoPath = false)
}

Expand Down
Loading