From 7ee9e39cb43c43d69dfe8035106f7556886e60b1 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 29 Jun 2016 19:36:21 +0800 Subject: [PATCH] [SPARK-16157][SQL] Add New Methods for comments in StructField and StructType #### What changes were proposed in this pull request? Based on the previous discussion with cloud-fan hvanhovell in another related PR https://github.com/apache/spark/pull/13764#discussion_r67994276, it looks reasonable to add convenience methods for users to add `comment` when defining `StructField`. Currently, the column-related `comment` attribute is stored in `Metadata` of `StructField`. For example, users can add the `comment` attribute using the following way: ```Scala StructType( StructField( "cl1", IntegerType, nullable = false, new MetadataBuilder().putString("comment", "test").build()) :: Nil) ``` This PR is to add more user friendly methods for the `comment` attribute when defining a `StructField`. After the changes, users are provided three different ways to do it: ```Scala val struct = (new StructType) .add("a", "int", true, "test1") val struct = (new StructType) .add("c", StringType, true, "test3") val struct = (new StructType) .add(StructField("d", StringType).withComment("test4")) ``` #### How was this patch tested? Added test cases: - `DataTypeSuite` is for testing three types of API changes, - `DataFrameReaderWriterSuite` is for parquet, json and csv formats - using in-memory catalog - `OrcQuerySuite.scala` is for orc format using Hive-metastore Author: gatorsmile Closes #13860 from gatorsmile/newMethodForComment. --- .../sql/catalyst/parser/AstBuilder.scala | 10 ++---- .../apache/spark/sql/types/StructField.scala | 18 ++++++++++ .../apache/spark/sql/types/StructType.scala | 35 +++++++++++++++++++ .../spark/sql/types/DataTypeSuite.scala | 17 +++++++++ .../spark/sql/execution/command/tables.scala | 3 +- .../execution/command/DDLCommandSuite.scala | 3 +- .../spark/sql/sources/DDLTestSuite.scala | 3 +- .../spark/sql/sources/TableScanSuite.scala | 8 ++--- .../sql/test/DataFrameReaderWriterSuite.scala | 27 +++++++++++++- .../spark/sql/hive/orc/OrcQuerySuite.scala | 22 ++++++++++++ 10 files changed, 125 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c7420a1c5965d..f2cc8d362478a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1430,13 +1430,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) { import ctx._ - - // Add the comment to the metadata. - val builder = new MetadataBuilder - if (STRING != null) { - builder.putString("comment", string(STRING)) - } - - StructField(identifier.getText, typedVisit(dataType), nullable = true, builder.build()) + val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true) + if (STRING == null) structField else structField.withComment(string(STRING)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala index 83570a5eaee61..cb8bf616968e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala @@ -51,4 +51,22 @@ case class StructField( ("nullable" -> nullable) ~ ("metadata" -> metadata.jsonValue) } + + /** + * Updates the StructField with a new comment value. + */ + def withComment(comment: String): StructField = { + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .putString("comment", comment) + .build() + copy(metadata = newMetadata) + } + + /** + * Return the comment of this StructField. + */ + def getComment(): Option[String] = { + if (metadata.contains("comment")) Option(metadata.getString("comment")) else None + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 436512ff69335..0e89f71dc1cf3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -170,6 +170,23 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru StructType(fields :+ new StructField(name, dataType, nullable, metadata)) } + /** + * Creates a new [[StructType]] by adding a new field and specifying metadata. + * {{{ + * val struct = (new StructType) + * .add("a", IntegerType, true, "comment1") + * .add("b", LongType, false, "comment2") + * .add("c", StringType, true, "comment3") + * }}} + */ + def add( + name: String, + dataType: DataType, + nullable: Boolean, + comment: String): StructType = { + StructType(fields :+ StructField(name, dataType, nullable).withComment(comment)) + } + /** * Creates a new [[StructType]] by adding a new nullable field with no metadata where the * dataType is specified as a String. @@ -218,6 +235,24 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru add(name, CatalystSqlParser.parseDataType(dataType), nullable, metadata) } + /** + * Creates a new [[StructType]] by adding a new field and specifying metadata where the + * dataType is specified as a String. + * {{{ + * val struct = (new StructType) + * .add("a", "int", true, "comment1") + * .add("b", "long", false, "comment2") + * .add("c", "string", true, "comment3") + * }}} + */ + def add( + name: String, + dataType: String, + nullable: Boolean, + comment: String): StructType = { + add(name, CatalystSqlParser.parseDataType(dataType), nullable, comment) + } + /** * Extracts the [[StructField]] with the given name. * diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index 6b85f12521c2a..688bc3e6026ec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -52,6 +52,23 @@ class DataTypeSuite extends SparkFunSuite { assert(StructField("b", LongType, false) === struct("b")) } + test("construct with add from StructField with comments") { + // Test creation from StructField using four different ways + val struct = (new StructType) + .add("a", "int", true, "test1") + .add("b", StringType, true, "test3") + .add(StructField("c", LongType, false).withComment("test4")) + .add(StructField("d", LongType)) + + assert(StructField("a", IntegerType, true).withComment("test1") == struct("a")) + assert(StructField("b", StringType, true).withComment("test3") == struct("b")) + assert(StructField("c", LongType, false).withComment("test4") == struct("c")) + assert(StructField("d", LongType) == struct("d")) + + assert(struct("c").getComment() == Option("test4")) + assert(struct("d").getComment().isEmpty) + } + test("construct with String DataType") { // Test creation with DataType as String val struct = (new StructType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 30dc7e81e9eeb..687d69aa5f262 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -528,8 +528,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = { schema.foreach { column => - val comment = - if (column.metadata.contains("comment")) column.metadata.getString("comment") else "" + val comment = column.getComment().getOrElse("") append(buffer, column.name, column.dataType.simpleString, comment) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 7b96f4c99ab5a..e1a7b9b0048ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -354,8 +354,7 @@ class DDLCommandSuite extends PlanTest { val expected = CreateTableUsing( TableIdentifier("my_tab"), Some(new StructType() - .add("a", IntegerType, nullable = true, - new MetadataBuilder().putString("comment", s"test").build()) + .add("a", IntegerType, nullable = true, "test") .add("b", StringType)), "parquet", false, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala index 5a7a9073fb3a2..d0ad3190e02eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala @@ -45,8 +45,7 @@ case class SimpleDDLScan( override def schema: StructType = StructType(Seq( - StructField("intType", IntegerType, nullable = false, - new MetadataBuilder().putString("comment", s"test comment $table").build()), + StructField("intType", IntegerType, nullable = false).withComment(s"test comment $table"), StructField("stringType", StringType, nullable = false), StructField("dateType", DateType, nullable = false), StructField("timestampType", TimestampType, nullable = false), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 93116d84ced71..d486fa8f336d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -388,12 +388,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { |) """.stripMargin) - val planned = sql("SELECT * FROM student").queryExecution.executedPlan - val comments = planned.schema.fields.map { field => - if (field.metadata.contains("comment")) field.metadata.getString("comment") - else "NO_COMMENT" - }.mkString(",") - + val planned = sql("SELECT * FROM student").queryExecution.executedPlan + val comments = planned.schema.fields.map(_.getComment().getOrElse("NO_COMMENT")).mkString(",") assert(comments === "SN,SA,NO_COMMENT") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index ebbcc1d7ffbb5..58b1d56358147 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.Utils @@ -391,6 +391,31 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be spark.range(10).write.orc(dir) } + test("column nullability and comment - write and then read") { + import testImplicits._ + + Seq("json", "parquet", "csv").foreach { format => + val schema = StructType( + StructField("cl1", IntegerType, nullable = false).withComment("test") :: + StructField("cl2", IntegerType, nullable = true) :: + StructField("cl3", IntegerType, nullable = true) :: Nil) + val row = Row(3, null, 4) + val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) + + val tableName = "tab" + withTable(tableName) { + df.write.format(format).mode("overwrite").saveAsTable(tableName) + // Verify the DDL command result: DESCRIBE TABLE + checkAnswer( + sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"), + Row("cl1", "test") :: Nil) + // Verify the schema + val expectedFields = schema.fields.map(f => f.copy(nullable = true)) + assert(spark.table(tableName).schema == schema.copy(fields = expectedFields)) + } + } + } + private def testRead( df: => DataFrame, expectedResult: Seq[String], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index cd41da7214a23..4a86987e290e6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{IntegerType, StructType} case class AllDataTypesWithNonPrimitiveType( stringField: String, @@ -462,4 +463,25 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } } + + test("column nullability and comment - write and then read") { + val schema = (new StructType) + .add("cl1", IntegerType, nullable = false, comment = "test") + .add("cl2", IntegerType, nullable = true) + .add("cl3", IntegerType, nullable = true) + val row = Row(3, null, 4) + val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) + + val tableName = "tab" + withTable(tableName) { + df.write.format("orc").mode("overwrite").saveAsTable(tableName) + // Verify the DDL command result: DESCRIBE TABLE + checkAnswer( + sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"), + Row("cl1", "test") :: Nil) + // Verify the schema + val expectedFields = schema.fields.map(f => f.copy(nullable = true)) + assert(spark.table(tableName).schema == schema.copy(fields = expectedFields)) + } + } }