diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index a349480359e0e..32c6f8f2cde16 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.types.StructType; +import java.util.Arrays; +import java.util.List; import java.util.Map; /** @@ -49,6 +51,16 @@ public interface TableCatalog extends CatalogPlugin { */ String PROP_COMMENT = "comment"; + /** + * A property to specify the provider of the table. + */ + String PROP_PROVIDER = "provider"; + + /** + * The list of reserved table properties. + */ + List RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER); + /** * List the tables in a namespace from the catalog. *

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index fa45d72062423..0dcd595ded191 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -252,7 +252,8 @@ private[sql] object CatalogV2Util { s"are both used to set the table comment, you can only specify one of them.") } - if (options.contains("provider") || properties.contains("provider")) { + if (options.contains(TableCatalog.PROP_PROVIDER) + || properties.contains(TableCatalog.PROP_PROVIDER)) { throw new AnalysisException( "USING and option/property 'provider' are both used to set the provider implementation, " + "you can only specify one of them.") @@ -266,7 +267,7 @@ private[sql] object CatalogV2Util { tableProperties ++= filteredOptions // convert USING, LOCATION, and COMMENT clauses to table properties - tableProperties += ("provider" -> provider) + tableProperties += (TableCatalog.PROP_PROVIDER -> provider) comment.map(text => tableProperties += (TableCatalog.PROP_COMMENT -> text)) location.orElse(options.get("path")).map( loc => tableProperties += (TableCatalog.PROP_LOCATION -> loc)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 42f145d5673d3..92515a0210c67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -535,7 +535,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ident, partitionTransforms, df.queryExecution.analyzed, - Map("provider" -> source) ++ getLocationIfExists, + Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists, extraOptions.toMap, orCreate = true) // Create the table if it doesn't exist @@ -548,7 +548,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ident, partitionTransforms, df.queryExecution.analyzed, - Map("provider" -> source) ++ getLocationIfExists, + Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists, extraOptions.toMap, ignoreIfExists = other == SaveMode.Ignore) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 8e601b50fdd4c..cf534ab6b9e36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -24,6 +24,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -128,7 +129,8 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) identifier, partitioning.getOrElse(Seq.empty), logicalPlan, - properties = provider.map(p => properties + ("provider" -> p)).getOrElse(properties).toMap, + properties = provider.map(p => properties + (TableCatalog.PROP_PROVIDER -> p)) + .getOrElse(properties).toMap, writeOptions = options.toMap, ignoreIfExists = false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index a032f5c9d5684..2815b0ac131f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} -import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.catalog.{Table, TableCatalog} import org.apache.spark.sql.types.StructType case class DescribeTableExec( @@ -36,14 +36,33 @@ case class DescribeTableExec( override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() addSchema(rows) + addPartitioning(rows) if (isExtended) { - addPartitioning(rows) - addProperties(rows) + addTableDetails(rows) } rows } + private def addTableDetails(rows: ArrayBuffer[InternalRow]): Unit = { + rows += emptyRow() + rows += toCatalystRow("# Detailed Table Information", "", "") + rows += toCatalystRow("Name", table.name(), "") + + TableCatalog.RESERVED_PROPERTIES.asScala.toList.foreach(propKey => { + if (table.properties.containsKey(propKey)) { + rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "") + } + }) + val properties = + table.properties.asScala.toList + .filter(kv => !TableCatalog.RESERVED_PROPERTIES.contains(kv._1)) + .sortBy(_._1).map { + case (key, value) => key + "=" + value + }.mkString("[", ",", "]") + rows += toCatalystRow("Table Properties", properties, "") + } + private def addSchema(rows: ArrayBuffer[InternalRow]): Unit = { rows ++= table.schema.map{ column => toCatalystRow( @@ -53,8 +72,7 @@ case class DescribeTableExec( private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = { rows += emptyRow() - rows += toCatalystRow(" Partitioning", "", "") - rows += toCatalystRow("--------------", "", "") + rows += toCatalystRow("# Partitioning", "", "") if (table.partitioning.isEmpty) { rows += toCatalystRow("Not partitioned", "", "") } else { @@ -64,15 +82,6 @@ case class DescribeTableExec( } } - private def addProperties(rows: ArrayBuffer[InternalRow]): Unit = { - rows += emptyRow() - rows += toCatalystRow(" Table Property", " Value", "") - rows += toCatalystRow("----------------", "-------", "") - rows ++= table.properties.asScala.toList.sortBy(_._1).map { - case (key, value) => toCatalystRow(key, value, "") - } - } - private def emptyRow(): InternalRow = toCatalystRow("", "", "") private def toCatalystRow(strs: String*): InternalRow = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index f452201a8d3ba..8d9957fe898d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -83,7 +83,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) properties: util.Map[String, String]): Table = { val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions) - val provider = properties.getOrDefault("provider", conf.defaultDataSourceName) + val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) val tableProperties = properties.asScala val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 9279d2e882997..629fd28414c54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -112,7 +112,10 @@ class DataSourceV2SQLSuite val description = descriptionDf.collect() assert(description === Seq( Row("id", "bigint", ""), - Row("data", "string", ""))) + Row("data", "string", ""), + Row("", "", ""), + Row("# Partitioning", "", ""), + Row("Part 0", "id", ""))) } test("DescribeTable with v2 catalog when table does not exist.") { @@ -125,7 +128,9 @@ class DataSourceV2SQLSuite spark.sql("CREATE TABLE testcat.table_name (id bigint, data string)" + " USING foo" + " PARTITIONED BY (id)" + - " TBLPROPERTIES ('bar'='baz')") + " TBLPROPERTIES ('bar'='baz')" + + " COMMENT 'this is a test table'" + + " LOCATION '/tmp/testcat/table_name'") val descriptionDf = spark.sql("DESCRIBE TABLE EXTENDED testcat.table_name") assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( @@ -138,14 +143,15 @@ class DataSourceV2SQLSuite Array("id", "bigint", ""), Array("data", "string", ""), Array("", "", ""), - Array("Partitioning", "", ""), - Array("--------------", "", ""), + Array("# Partitioning", "", ""), Array("Part 0", "id", ""), Array("", "", ""), - Array("Table Property", "Value", ""), - Array("----------------", "-------", ""), - Array("bar", "baz", ""), - Array("provider", "foo", ""))) + Array("# Detailed Table Information", "", ""), + Array("Name", "testcat.table_name", ""), + Array("Comment", "this is a test table", ""), + Array("Location", "/tmp/testcat/table_name", ""), + Array("Provider", "foo", ""), + Array("Table Properties", "[bar=baz]", ""))) }