Skip to content

Commit

Permalink
[SPARK-16157][SQL] Add New Methods for comments in StructField and St…
Browse files Browse the repository at this point in the history
…ructType

#### What changes were proposed in this pull request?
Based on the previous discussion with cloud-fan hvanhovell in another related PR apache#13764 (comment), it looks reasonable to add convenience methods for users to add `comment` when defining `StructField`.

Currently, the column-related `comment` attribute is stored in `Metadata` of `StructField`. For example, users can add the `comment` attribute using the following way:
```Scala
StructType(
  StructField(
    "cl1",
    IntegerType,
    nullable = false,
    new MetadataBuilder().putString("comment", "test").build()) :: Nil)
```
This PR is to add more user friendly methods for the `comment` attribute when defining a `StructField`. After the changes, users are provided three different ways to do it:
```Scala
val struct = (new StructType)
  .add("a", "int", true, "test1")

val struct = (new StructType)
  .add("c", StringType, true, "test3")

val struct = (new StructType)
  .add(StructField("d", StringType).withComment("test4"))
```

#### How was this patch tested?
Added test cases:
- `DataTypeSuite` is for testing three types of API changes,
- `DataFrameReaderWriterSuite` is for parquet, json and csv formats - using in-memory catalog
- `OrcQuerySuite.scala` is for orc format using Hive-metastore

Author: gatorsmile <[email protected]>

Closes apache#13860 from gatorsmile/newMethodForComment.
  • Loading branch information
gatorsmile authored and cloud-fan committed Jun 29, 2016
1 parent d1e8108 commit 7ee9e39
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1430,13 +1430,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) {
import ctx._

// Add the comment to the metadata.
val builder = new MetadataBuilder
if (STRING != null) {
builder.putString("comment", string(STRING))
}

StructField(identifier.getText, typedVisit(dataType), nullable = true, builder.build())
val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
if (STRING == null) structField else structField.withComment(string(STRING))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,22 @@ case class StructField(
("nullable" -> nullable) ~
("metadata" -> metadata.jsonValue)
}

/**
* Updates the StructField with a new comment value.
*/
def withComment(comment: String): StructField = {
val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.putString("comment", comment)
.build()
copy(metadata = newMetadata)
}

/**
* Return the comment of this StructField.
*/
def getComment(): Option[String] = {
if (metadata.contains("comment")) Option(metadata.getString("comment")) else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,23 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
StructType(fields :+ new StructField(name, dataType, nullable, metadata))
}

/**
* Creates a new [[StructType]] by adding a new field and specifying metadata.
* {{{
* val struct = (new StructType)
* .add("a", IntegerType, true, "comment1")
* .add("b", LongType, false, "comment2")
* .add("c", StringType, true, "comment3")
* }}}
*/
def add(
name: String,
dataType: DataType,
nullable: Boolean,
comment: String): StructType = {
StructType(fields :+ StructField(name, dataType, nullable).withComment(comment))
}

/**
* Creates a new [[StructType]] by adding a new nullable field with no metadata where the
* dataType is specified as a String.
Expand Down Expand Up @@ -218,6 +235,24 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
add(name, CatalystSqlParser.parseDataType(dataType), nullable, metadata)
}

/**
* Creates a new [[StructType]] by adding a new field and specifying metadata where the
* dataType is specified as a String.
* {{{
* val struct = (new StructType)
* .add("a", "int", true, "comment1")
* .add("b", "long", false, "comment2")
* .add("c", "string", true, "comment3")
* }}}
*/
def add(
name: String,
dataType: String,
nullable: Boolean,
comment: String): StructType = {
add(name, CatalystSqlParser.parseDataType(dataType), nullable, comment)
}

/**
* Extracts the [[StructField]] with the given name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ class DataTypeSuite extends SparkFunSuite {
assert(StructField("b", LongType, false) === struct("b"))
}

test("construct with add from StructField with comments") {
// Test creation from StructField using four different ways
val struct = (new StructType)
.add("a", "int", true, "test1")
.add("b", StringType, true, "test3")
.add(StructField("c", LongType, false).withComment("test4"))
.add(StructField("d", LongType))

assert(StructField("a", IntegerType, true).withComment("test1") == struct("a"))
assert(StructField("b", StringType, true).withComment("test3") == struct("b"))
assert(StructField("c", LongType, false).withComment("test4") == struct("c"))
assert(StructField("d", LongType) == struct("d"))

assert(struct("c").getComment() == Option("test4"))
assert(struct("d").getComment().isEmpty)
}

test("construct with String DataType") {
// Test creation with DataType as String
val struct = (new StructType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF

private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
val comment =
if (column.metadata.contains("comment")) column.metadata.getString("comment") else ""
val comment = column.getComment().getOrElse("")
append(buffer, column.name, column.dataType.simpleString, comment)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ class DDLCommandSuite extends PlanTest {
val expected = CreateTableUsing(
TableIdentifier("my_tab"),
Some(new StructType()
.add("a", IntegerType, nullable = true,
new MetadataBuilder().putString("comment", s"test").build())
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)),
"parquet",
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ case class SimpleDDLScan(

override def schema: StructType =
StructType(Seq(
StructField("intType", IntegerType, nullable = false,
new MetadataBuilder().putString("comment", s"test comment $table").build()),
StructField("intType", IntegerType, nullable = false).withComment(s"test comment $table"),
StructField("stringType", StringType, nullable = false),
StructField("dateType", DateType, nullable = false),
StructField("timestampType", TimestampType, nullable = false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
|)
""".stripMargin)

val planned = sql("SELECT * FROM student").queryExecution.executedPlan
val comments = planned.schema.fields.map { field =>
if (field.metadata.contains("comment")) field.metadata.getString("comment")
else "NO_COMMENT"
}.mkString(",")

val planned = sql("SELECT * FROM student").queryExecution.executedPlan
val comments = planned.schema.fields.map(_.getComment().getOrElse("NO_COMMENT")).mkString(",")
assert(comments === "SN,SA,NO_COMMENT")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.sql._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -391,6 +391,31 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
spark.range(10).write.orc(dir)
}

test("column nullability and comment - write and then read") {
import testImplicits._

Seq("json", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
}
}
}

private def testRead(
df: => DataFrame,
expectedResult: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StructType}

case class AllDataTypesWithNonPrimitiveType(
stringField: String,
Expand Down Expand Up @@ -462,4 +463,25 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
}

test("column nullability and comment - write and then read") {
val schema = (new StructType)
.add("cl1", IntegerType, nullable = false, comment = "test")
.add("cl2", IntegerType, nullable = true)
.add("cl3", IntegerType, nullable = true)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format("orc").mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
}
}
}

0 comments on commit 7ee9e39

Please sign in to comment.