-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13139][SQL] Parse Hive DDL commands ourselves #11573
Changes from all commits
fc3c168
adcb561
010afdd
0079074
02de9b7
a663b5c
d79963a
042222d
6c3f994
794ae78
63e99c3
307a588
37854fc
6ad8dd5
074e3b4
7f3281e
bd91b0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,26 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
import ParserUtils._ | ||
|
||
/** Check if a command should not be explained. */ | ||
protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command | ||
protected def isNoExplainCommand(command: String): Boolean = { | ||
"TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command | ||
} | ||
|
||
/** | ||
* For each node, extract properties in the form of a list ['key1', 'key2', 'key3', 'value'] | ||
* into a pair (key1.key2.key3, value). | ||
*/ | ||
private def extractProps( | ||
props: Seq[ASTNode], | ||
expectedNodeText: String): Seq[(String, String)] = { | ||
props.map { | ||
case Token(x, keysAndValue) if x == expectedNodeText => | ||
val key = keysAndValue.init.map { x => unquoteString(x.text) }.mkString(".") | ||
val value = unquoteString(keysAndValue.last.text) | ||
(key, value) | ||
case p => | ||
parseFailed(s"Expected property '$expectedNodeText' in command", p) | ||
} | ||
} | ||
|
||
protected override def nodeToPlan(node: ASTNode): LogicalPlan = { | ||
node match { | ||
|
@@ -64,10 +83,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
val tableIdent = extractTableIdent(nameParts) | ||
RefreshTable(tableIdent) | ||
|
||
// CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] | ||
// [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]; | ||
case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) => | ||
val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq( | ||
"TOK_IFNOTEXISTS", | ||
"TOK_DATABASELOCATION", | ||
"TOK_DATABASECOMMENT", | ||
"TOK_DATABASEPROPERTIES"), args) | ||
val location = dbLocation.map { | ||
case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc) | ||
case _ => parseFailed("Invalid CREATE DATABASE command", node) | ||
} | ||
val comment = databaseComment.map { | ||
case Token("TOK_DATABASECOMMENT", Token(com, Nil) :: Nil) => unquoteString(com) | ||
case _ => parseFailed("Invalid CREATE DATABASE command", node) | ||
} | ||
val props = dbprops.toSeq.flatMap { | ||
case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) => | ||
extractProps(propList, "TOK_TABLEPROPERTY") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it will be also good to provide an example format like what we have for |
||
case _ => parseFailed("Invalid CREATE DATABASE command", node) | ||
}.toMap | ||
CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source) | ||
|
||
// CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name | ||
// [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; | ||
case Token("TOK_CREATEFUNCTION", args) => | ||
// Example format: | ||
// | ||
// TOK_CREATEFUNCTION | ||
// :- db_name | ||
// :- func_name | ||
// :- alias | ||
// +- TOK_RESOURCE_LIST | ||
// :- TOK_RESOURCE_URI | ||
// : :- TOK_JAR | ||
// : +- '/path/to/jar' | ||
// +- TOK_RESOURCE_URI | ||
// :- TOK_FILE | ||
// +- 'path/to/file' | ||
val (funcNameArgs, otherArgs) = args.partition { | ||
case Token("TOK_RESOURCE_LIST", _) => false | ||
case Token("TOK_TEMPORARY", _) => false | ||
case Token(_, Nil) => true | ||
case _ => parseFailed("Invalid CREATE FUNCTION command", node) | ||
} | ||
// If database name is specified, there are 3 tokens, otherwise 2. | ||
val (funcName, alias) = funcNameArgs match { | ||
case Token(dbName, Nil) :: Token(fname, Nil) :: Token(aname, Nil) :: Nil => | ||
(unquoteString(dbName) + "." + unquoteString(fname), unquoteString(aname)) | ||
case Token(fname, Nil) :: Token(aname, Nil) :: Nil => | ||
(unquoteString(fname), unquoteString(aname)) | ||
case _ => | ||
parseFailed("Invalid CREATE FUNCTION command", node) | ||
} | ||
// Extract other keywords, if they exist | ||
val Seq(rList, temp) = getClauses(Seq("TOK_RESOURCE_LIST", "TOK_TEMPORARY"), otherArgs) | ||
val resourcesMap = rList.toSeq.flatMap { | ||
case Token("TOK_RESOURCE_LIST", resources) => | ||
resources.map { | ||
case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) => | ||
val resourceType = rType match { | ||
case Token("TOK_JAR", Nil) => "jar" | ||
case Token("TOK_FILE", Nil) => "file" | ||
case Token("TOK_ARCHIVE", Nil) => "archive" | ||
case Token(f, _) => parseFailed(s"Unexpected resource format '$f'", node) | ||
} | ||
(resourceType, unquoteString(rPath)) | ||
case _ => parseFailed("Invalid CREATE FUNCTION command", node) | ||
} | ||
case _ => parseFailed("Invalid CREATE FUNCTION command", node) | ||
}.toMap | ||
CreateFunction(funcName, alias, resourcesMap, temp.isDefined)(node.source) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will this map look like if I have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also have a test for this case. |
||
|
||
case Token("TOK_ALTERTABLE", alterTableArgs) => | ||
AlterTableCommandParser.parse(node) | ||
|
||
case Token("TOK_CREATETABLEUSING", createTableArgs) => | ||
val Seq( | ||
temp, | ||
allowExisting, | ||
ifNotExists, | ||
Some(tabName), | ||
tableCols, | ||
Some(Token("TOK_TABLEPROVIDER", providerNameParts)), | ||
|
@@ -79,30 +174,22 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
"TOK_TABLEPROVIDER", | ||
"TOK_TABLEOPTIONS", | ||
"TOK_QUERY"), createTableArgs) | ||
|
||
val tableIdent: TableIdentifier = extractTableIdent(tabName) | ||
|
||
val columns = tableCols.map { | ||
case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField)) | ||
case _ => parseFailed("Invalid CREATE TABLE command", node) | ||
} | ||
|
||
val provider = providerNameParts.map { | ||
case Token(name, Nil) => name | ||
case _ => parseFailed("Invalid CREATE TABLE command", node) | ||
}.mkString(".") | ||
|
||
val options: Map[String, String] = tableOpts.toSeq.flatMap { | ||
case Token("TOK_TABLEOPTIONS", options) => | ||
options.map { | ||
case Token("TOK_TABLEOPTION", keysAndValue) => | ||
val key = keysAndValue.init.map(_.text).mkString(".") | ||
val value = unquoteString(keysAndValue.last.text) | ||
(key, value) | ||
} | ||
val options = tableOpts.toSeq.flatMap { | ||
case Token("TOK_TABLEOPTIONS", opts) => extractProps(opts, "TOK_TABLEOPTION") | ||
case _ => parseFailed("Invalid CREATE TABLE command", node) | ||
}.toMap | ||
val asClause = tableAs.map(nodeToPlan) | ||
|
||
val asClause = tableAs.map(nodeToPlan(_)) | ||
|
||
if (temp.isDefined && allowExisting.isDefined) { | ||
if (temp.isDefined && ifNotExists.isDefined) { | ||
throw new AnalysisException( | ||
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") | ||
} | ||
|
@@ -113,7 +200,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
"a CREATE TABLE AS SELECT statement does not allow column definitions.") | ||
} | ||
|
||
val mode = if (allowExisting.isDefined) { | ||
val mode = if (ifNotExists.isDefined) { | ||
SaveMode.Ignore | ||
} else if (temp.isDefined) { | ||
SaveMode.Overwrite | ||
|
@@ -136,7 +223,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly | |
provider, | ||
temp.isDefined, | ||
options, | ||
allowExisting.isDefined, | ||
ifNotExists.isDefined, | ||
managedIfNoPath = false) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is good to provide a concrete example as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what is the difference between this method and
extractTableProps
defined for alter table?