Skip to content

Commit

Permalink
fix bug in CTAS when table already existed
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed Nov 3, 2014
1 parent 2ebd1df commit 194113e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ trait Catalog {

def caseSensitive: Boolean

def tableExists(db: Option[String], tableName: String): Boolean

def lookupRelation(
databaseName: Option[String],
tableName: String,
Expand Down Expand Up @@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
tables.clear()
}

override def tableExists(db: Option[String], tableName: String): Boolean = {
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
tables.get(tblName) match {
case Some(_) => true
case None => false
}
}

override def lookupRelation(
databaseName: Option[String],
tableName: String,
Expand All @@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog {
// TODO: This doesn't work when the database changes...
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()

abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
overrides.get((dbName, tblName)) match {
case Some(_) => true
case None => super.tableExists(db, tableName)
}
}

abstract override def lookupRelation(
databaseName: Option[String],
tableName: String,
Expand Down Expand Up @@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog {

val caseSensitive: Boolean = true

def tableExists(db: Option[String], tableName: String): Boolean = {
throw new UnsupportedOperationException
}

def lookupRelation(
databaseName: Option[String],
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

val caseSensitive: Boolean = false

def tableExists(db: Option[String], tableName: String): Boolean = {
val (databaseName, tblName) = processDatabaseAndTableName(
db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
client.getTable(databaseName, tblName, false) != null
}

def lookupRelation(
db: Option[String],
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,17 @@ case class CreateTableAsSelect(
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
if (sc.catalog.tableExists(Some(database), tableName)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
throw
new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
}
} else {
sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
}

Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SQLQuerySuite extends QueryTest {
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
| SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
// expect the string => integer for field key cause the table ctas4 already existed.
// do nothing cause the table ctas4 already existed.
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
| SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
Expand All @@ -78,9 +78,14 @@ class SQLQuerySuite extends QueryTest {
SELECT key, value
FROM src
ORDER BY key, value""").collect().toSeq)
intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
sql(
"""CREATE TABLE ctas4 AS
| SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
}
checkAnswer(
sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq)
sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)

checkExistence(sql("DESC EXTENDED ctas2"), true,
"name:key", "type:string", "name:value", "ctas2",
Expand Down

0 comments on commit 194113e

Please sign in to comment.