Skip to content

Commit

Permalink
[SPARK-31709][SQL] Proper base path for database/table location when …
Browse files Browse the repository at this point in the history
…it is a relative path

### What changes were proposed in this pull request?

Currently, the user home directory is used as the base path for the database and table locations when their locationa are specified with a relative paths, e.g.
```sql
> set spark.sql.warehouse.dir;
spark.sql.warehouse.dir	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/spark-warehouse/
spark-sql> create database loctest location 'loctestdbdir';

spark-sql> desc database loctest;
Database Name	loctest
Comment
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/loctestdbdir
Owner	kentyao

spark-sql> create table loctest(id int) location 'loctestdbdir';
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	default
Table	loctest
Owner	kentyao
Created Time	Thu May 14 16:29:05 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/loctestdbdir
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
```
The user home is not always warehouse-related, unchangeable in runtime, and shared both by database and table as the parent directory. Meanwhile, we use the table path as the parent directory for relative partition locations.

The config `spark.sql.warehouse.dir` represents `the default location for managed databases and tables`.
For databases, the case above seems not to follow its semantics, because it should use ` `spark.sql.warehouse.dir` as the base path instead.

For tables, it seems to be right but here I suggest enriching the meaning that lets it also be the for external tables with relative paths for locations.

With changes in this PR,

The location of a database will be `warehouseDir/dbpath` when `dbpath` is relative.
The location of a table will be `dbpath/tblpath` when `tblpath` is relative.

### Why are the changes needed?

bugfix and improvement

Firstly, the databases with relative locations should be created under the default location specified by `spark.sql.warehouse.dir`.

Secondly, the external tables with relative paths may also follow this behavior for consistency.

At last, the behavior for database, tables and partitions with relative paths to choose base paths should be the same.

### Does this PR introduce _any_ user-facing change?

Yes, this PR changes the `createDatabase`, `alterDatabase`, `createTable` and `alterTable` APIs and related DDLs. If the LOCATION clause is followed by a relative path, the root path will be `spark.sql.warehouse.dir` for databases, and `spark.sql.warehouse.dir` / `dbPath` for tables.

e.g.

#### after
```sql
spark-sql> desc database loctest;
Database Name	loctest
Comment
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest
Owner	kentyao
spark-sql> use loctest;
spark-sql> create table loctest(id int) location 'loctest';
20/05/14 18:18:02 WARN InMemoryFileIndex: The directory file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/loctest was not found. Was it deleted very recently?
20/05/14 18:18:02 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	loctest
Table	loctest
Owner	kentyao
Created Time	Thu May 14 18:18:03 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest/loctest
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
spark-sql> alter table loctest set location 'loctest2'
         > ;
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	loctest
Table	loctest
Owner	kentyao
Created Time	Thu May 14 18:18:03 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest/loctest2
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
```
### How was this patch tested?

Add unit tests.

Closes #28527 from yaooqinn/SPARK-31709.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
yaooqinn authored and cloud-fan committed Aug 3, 2020
1 parent 42f9ee4 commit 3deb59d
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,20 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
dbDefinition.copy(name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)),
ignoreIfExists)
}

private def makeQualifiedDBPath(locationUri: URI): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val fullPath = new Path(conf.warehousePath, CatalogUtils.URIToString(locationUri))
makeQualifiedPath(fullPath.toUri)
}
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == DEFAULT_DATABASE) {
Expand All @@ -241,7 +249,8 @@ class SessionCatalog(
def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
val dbName = formatDatabaseName(dbDefinition.name)
requireDbExists(dbName)
externalCatalog.alterDatabase(dbDefinition.copy(name = dbName))
externalCatalog.alterDatabase(dbDefinition.copy(
name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)))
}

def getDatabaseMetadata(db: String): CatalogDatabase = {
Expand Down Expand Up @@ -283,8 +292,7 @@ class SessionCatalog(
* by users.
*/
def getDefaultDBPath(db: String): URI = {
val database = formatDatabaseName(db)
new Path(new Path(conf.warehousePath), database + ".db").toUri
CatalogUtils.stringToURI(formatDatabaseName(db) + ".db")
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -317,7 +325,7 @@ class SessionCatalog(
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = tableIdentifier)
Expand Down Expand Up @@ -350,6 +358,16 @@ class SessionCatalog(
}
}

private def makeQualifiedTablePath(locationUri: URI, database: String): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val dbName = formatDatabaseName(database)
val dbLocation = makeQualifiedDBPath(getDatabaseMetadata(dbName).locationUri)
new Path(new Path(dbLocation), CatalogUtils.URIToString(locationUri)).toUri
}
}

/**
* Alter the metadata of an existing metastore table identified by `tableDefinition`.
*
Expand All @@ -369,7 +387,7 @@ class SessionCatalog(
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
val qualifiedTableLocation =
makeQualifiedPath(tableDefinition.storage.locationUri.get)
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = tableIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,18 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
val storage = if (location.isDefined) {
catalogTable.storage.copy(locationUri = location)
} else {
catalogTable.storage
}

try {
catalog.alterTable(
catalogTable
.copy(properties = properties, schema = schema, owner = owner, comment = comment))
catalogTable.copy(
properties = properties, schema = schema, owner = owner, comment = comment,
storage = storage))
} catch {
case _: NoSuchTableException =>
throw new NoSuchTableException(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

package org.apache.spark.sql.execution.datasources.v2

import java.net.URI
import java.util
import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -160,6 +162,36 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(catalog.tableExists(testIdent))
}

private def makeQualifiedPathWithWarehouse(path: String): URI = {
val p = new Path(spark.sessionState.conf.warehousePath, path)
val fs = p.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(p).toUri

}

test("createTable: location") {
val catalog = newCatalog()
val properties = new util.HashMap[String, String]()
assert(!catalog.tableExists(testIdent))

// default location
val t1 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
catalog.dropTable(testIdent)

// relative path
properties.put(TableCatalog.PROP_LOCATION, "relative/path")
val t2 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))
catalog.dropTable(testIdent)

// absolute path
properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
val t3 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
}

test("tableExists") {
val catalog = newCatalog()

Expand Down Expand Up @@ -640,6 +672,26 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(exc.message.contains("not found"))
}

test("alterTable: location") {
val catalog = newCatalog()
assert(!catalog.tableExists(testIdent))

// default location
val t1 = catalog.createTable(testIdent, schema, Array.empty, emptyProps).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))

// relative path
val t2 = catalog.alterTable(testIdent,
TableChange.setProperty(TableCatalog.PROP_LOCATION, "relative/path")).asInstanceOf[V1Table]
assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path"))

// absolute path
val t3 = catalog.alterTable(testIdent,
TableChange.setProperty(TableCatalog.PROP_LOCATION, "/absolute/path")).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:/absolute/path")
}

test("dropTable") {
val catalog = newCatalog()

Expand Down Expand Up @@ -812,11 +864,15 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

test("createNamespace: basic behavior") {
val catalog = newCatalog()
val expectedPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString

val sessionCatalog = sqlContext.sessionState.catalog
val expectedPath =
new Path(spark.sessionState.conf.warehousePath,
sessionCatalog.getDefaultDBPath(testNs(0)).toString).toString

catalog.createNamespace(testNs, Map("property" -> "value").asJava)

assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(catalog.namespaceExists(testNs) === true)
val metadata = catalog.loadNamespaceMetadata(testNs).asScala
Expand All @@ -842,6 +898,23 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.dropNamespace(testNs)
}

test("createNamespace: relative location") {
val catalog = newCatalog()
val expectedPath =
new Path(spark.sessionState.conf.warehousePath, "a/b/c").toString

catalog.createNamespace(testNs, Map("location" -> "a/b/c").asJava)

assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(catalog.namespaceExists(testNs) === true)
val metadata = catalog.loadNamespaceMetadata(testNs).asScala
checkMetadata(metadata, Map.empty)
assert(expectedPath === metadata("location"))

catalog.dropNamespace(testNs)
}

test("createNamespace: fail if namespace already exists") {
val catalog = newCatalog()

Expand Down Expand Up @@ -954,16 +1027,23 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

test("alterNamespace: update namespace location") {
val catalog = newCatalog()
val initialPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString
val newPath = "file:/tmp/db.db"
val initialPath =
new Path(spark.sessionState.conf.warehousePath,
spark.sessionState.catalog.getDefaultDBPath(testNs(0)).toString).toString

val newAbsoluteUri = "file:/tmp/db.db"
catalog.createNamespace(testNs, emptyProps)
assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri)
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newAbsoluteUri))
assert(newAbsoluteUri === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)

catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newPath))
val newAbsolutePath = "/tmp/newAbsolutePath"
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newAbsolutePath))
assert("file:" + newAbsolutePath === spark.catalog.getDatabase(testNs(0)).locationUri)

assert(newPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString)
val newRelativePath = new Path(spark.sessionState.conf.warehousePath, "relativeP").toString
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", "relativeP"))
assert(newRelativePath === spark.catalog.getDatabase(testNs(0)).locationUri)

catalog.dropNamespace(testNs)
}
Expand Down

0 comments on commit 3deb59d

Please sign in to comment.