Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13139][SQL] Parse Hive DDL commands ourselves #11573

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,15 @@ class SQLContext private[sql](
DataFrame(this, parseSql(sqlText))
}

/**
* Executes a SQL query without parsing it, but instead passing it directly to an underlying
* system to process. This is currently only used for Hive DDLs and will be removed as soon
* as Spark can parse all supported Hive DDLs itself.
*/
private[sql] def runNativeSql(sqlText: String): Seq[Row] = {
throw new UnsupportedOperationException
}

/**
* Returns the specified table as a [[DataFrame]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,28 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
import ParserUtils._

/** Check if a command should not be explained. */
protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command
protected def isNoExplainCommand(command: String): Boolean = {
"TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command
}

protected def extractProps(
node: ASTNode,
firstLevelProp: String,
secondLevelProp: String): Seq[(String, String)] = {
node match {
case Token(x, options) if x == firstLevelProp =>
options.map {
case Token(y, keysAndValue) if y == secondLevelProp =>
val key = keysAndValue.init.map(x => unquoteString(x.text)).mkString(".")
val value = unquoteString(keysAndValue.last.text)
(key, value)
case _ =>
throw new AnalysisException(s"Expected property '$secondLevelProp' in '${node.text}'")
}
case _ =>
throw new AnalysisException(s"Expected property '$firstLevelProp' in '${node.text}'")
}
}

protected override def nodeToPlan(node: ASTNode): LogicalPlan = {
node match {
Expand Down Expand Up @@ -64,6 +85,53 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
val tableIdent = extractTableIdent(nameParts)
RefreshTable(tableIdent)

case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: createDatabaseArgs) =>
val Seq(
allowExisting,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't allowExists the exact opposite of IF NOT EXISTS? I am asking because I have a similar case in my PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite understand your question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When i see a parameter that is named allowExists I think the existence of the current table is allowed, and that as a consequence the current create table command is going to overwrite the existing table. Why not name this ifNotExists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed it.

dbLocation,
databaseComment,
dbprops) = getClauses(Seq(
"TOK_IFNOTEXISTS",
"TOK_DATABASELOCATION",
"TOK_DATABASECOMMENT",
"TOK_DATABASEPROPERTIES"), createDatabaseArgs)
val location = dbLocation.map {
case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc)
}
val comment = databaseComment.map {
case Token("TOK_DATABASECOMMENT", Token(comment, Nil) :: Nil) => unquoteString(comment)
}
val props: Map[String, String] = dbprops.toSeq.flatMap {
case Token("TOK_DATABASEPROPERTIES", propList) =>
propList.flatMap(extractProps(_, "TOK_DBPROPLIST", "TOK_TABLEPROPERTY"))
}.toMap
CreateDatabase(databaseName, allowExisting.isDefined, location, comment, props)(node.source)

case Token("TOK_CREATEFUNCTION", func :: as :: createFuncArgs) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this pattern works for functions with a dotted name. For example

create function s.t as 'foo.bar.func'

Produces the following AST:

TOK_CREATEFUNCTION 1, 0, 10, 16
:- s 1, 4, 4, 16
:- t 1, 6, 6, 18
+- 'foo.bar.func' 1, 10, 10, 23

Notice how the name is splitted into two top-level ASTNodes.

I think we need to change the parser rule here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix shortly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in latest commit.

val funcName = func.map(x => unquoteString(x.text)).mkString(".")
val asName = unquoteString(as.text)
val Seq(
rList,
temp) = getClauses(Seq(
"TOK_RESOURCE_LIST",
"TOK_TEMPORARY"), createFuncArgs)
val resourcesMap: Map[String, String] = rList.toSeq.flatMap {
case Token("TOK_RESOURCE_LIST", resources) =>
resources.map {
case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) =>
val resourceType = rType match {
case Token("TOK_JAR", Nil) => "jar"
case Token("TOK_FILE", Nil) => "file"
case Token("TOK_ARCHIVE", Nil) => "archive"
}
(resourceType, unquoteString(rPath))
}
}.toMap
CreateFunction(funcName, asName, resourcesMap, temp.isDefined)(node.source)

case Token("TOK_ALTERTABLE", alterTableArgs) =>
AlterTableCommandParser.parse(node)

case Token("TOK_CREATETABLEUSING", createTableArgs) =>
val Seq(
temp,
Expand All @@ -90,17 +158,9 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
case Token(name, Nil) => name
}.mkString(".")

val options: Map[String, String] = tableOpts.toSeq.flatMap {
case Token("TOK_TABLEOPTIONS", options) =>
options.map {
case Token("TOK_TABLEOPTION", keysAndValue) =>
val key = keysAndValue.init.map(_.text).mkString(".")
val value = unquoteString(keysAndValue.last.text)
(key, value)
}
}.toMap

val asClause = tableAs.map(nodeToPlan(_))
val options: Map[String, String] =
tableOpts.toSeq.flatMap(extractProps(_, "TOK_TABLEOPTIONS", "TOK_TABLEOPTION")).toMap
val asClause = tableAs.map(nodeToPlan)

if (temp.isDefined && allowExisting.isDefined) {
throw new AnalysisException(
Expand Down
Loading