From f099a678e1f6dc0bfb591206402397d492e7720d Mon Sep 17 00:00:00 2001 From: Maryann Xue Date: Mon, 18 Jun 2018 13:47:53 -0700 Subject: [PATCH 1/5] [SPARK-24583] Wrong schema type in InsertIntoDataSourceCommand --- .../InsertIntoDataSourceCommand.scala | 3 +- .../spark/sql/sources/InsertSuite.scala | 81 ++++++++++++++++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index a813829d50cb1..3cfdfcc1812f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -39,7 +39,8 @@ case class InsertIntoDataSourceCommand( val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = Dataset.ofRows(sparkSession, query) // Apply the schema of the existing table to the new data. - val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) + val df = sparkSession.internalCreateDataFrame( + data.queryExecution.toRdd, logicalRelation.schema.asNullable) relation.insert(df, overwrite) // Re-cache all cached plans(including this relation itself, if it's cached) that refer to this diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index fef01c860db6e..e130e03113b17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -20,12 +20,47 @@ package org.apache.spark.sql.sources import java.io.File import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +class SimpleInsertSource extends SchemaRelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { + SimpleInsert(schema)(sqlContext.sparkSession) + } +} + +case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val sparkSession: SparkSession) + extends BaseRelation with InsertableRelation { + + override def sqlContext: SQLContext = sparkSession.sqlContext + + override def schema: StructType = userSpecifiedSchema + + override def insert(input: DataFrame, overwrite: Boolean): Unit = { + input.foreach { row => + schema.fields.zipWithIndex.filter(!_._1.nullable).foreach { field => + if (row.get(field._2) == null) { + throw new NotNullableViolationException(field._1.name) + } + } + } + } +} + +class NotNullableViolationException(val message: String) + extends Exception(message) with Serializable { + override def getMessage: String = s"Value for column '$message' cannot be null." +} + class InsertSuite extends DataSourceTest with SharedSQLContext { import testImplicits._ @@ -520,4 +555,48 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } } } + + test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { + withTable("test_table") { + val schema = new StructType() + .add("i", IntegerType, false) + .add("s", StringType, false) + val newTable = CatalogTable( + identifier = TableIdentifier("test_table", None), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = Map.empty), + schema = schema, + provider = Some("org.apache.spark.sql.sources.SimpleInsertSource")) + + spark.sessionState.catalog.createTable(newTable, false) + + def verifyException(e: Exception, column: String): Unit = { + var ex = e.getCause + while (ex != null && + !ex.isInstanceOf[NotNullableViolationException]) { + ex = ex.getCause + } + if (ex == null) { + fail(s"Expected a NotNullableViolationException but got '${e.getMessage}'.") + } + assert(ex.getMessage.contains(s"Value for column '$column' cannot be null.")) + } + + sql("INSERT INTO TABLE test_table SELECT 1, 'a'") + verifyException( + intercept[SparkException] { + sql("INSERT INTO TABLE test_table SELECT null, 'b'") + }, "i") + verifyException( + intercept[SparkException] { + sql("INSERT INTO TABLE test_table SELECT 2, null") + }, "s") + } + } } From 8add16939c48563f9152cc71007cc50ff6124560 Mon Sep 17 00:00:00 2001 From: Maryann Xue Date: Tue, 19 Jun 2018 09:59:03 -0700 Subject: [PATCH 2/5] remove applying schema type for insert input --- .../execution/datasources/InsertIntoDataSourceCommand.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 3cfdfcc1812f1..ee29c6e7426e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -38,10 +38,8 @@ case class InsertIntoDataSourceCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = Dataset.ofRows(sparkSession, query) - // Apply the schema of the existing table to the new data. - val df = sparkSession.internalCreateDataFrame( - data.queryExecution.toRdd, logicalRelation.schema.asNullable) - relation.insert(df, overwrite) + // Data should have been casted to the schema of the insert relation. + relation.insert(data, overwrite) // Re-cache all cached plans(including this relation itself, if it's cached) that refer to this // data source relation. From bb9fa037605b25c8503f288c3e6d870a4b208904 Mon Sep 17 00:00:00 2001 From: Maryann Xue Date: Tue, 19 Jun 2018 10:05:42 -0700 Subject: [PATCH 3/5] Simplify test case --- .../spark/sql/sources/InsertSuite.scala | 36 ++----------------- 1 file changed, 3 insertions(+), 33 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index e130e03113b17..40f56d4b76564 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -46,21 +46,10 @@ case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val sparkSes override def schema: StructType = userSpecifiedSchema override def insert(input: DataFrame, overwrite: Boolean): Unit = { - input.foreach { row => - schema.fields.zipWithIndex.filter(!_._1.nullable).foreach { field => - if (row.get(field._2) == null) { - throw new NotNullableViolationException(field._1.name) - } - } - } + input.collect } } -class NotNullableViolationException(val message: String) - extends Exception(message) with Serializable { - override def getMessage: String = s"Value for column '$message' cannot be null." -} - class InsertSuite extends DataSourceTest with SharedSQLContext { import testImplicits._ @@ -559,7 +548,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { withTable("test_table") { val schema = new StructType() - .add("i", IntegerType, false) + .add("i", LongType, false) .add("s", StringType, false) val newTable = CatalogTable( identifier = TableIdentifier("test_table", None), @@ -576,27 +565,8 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { spark.sessionState.catalog.createTable(newTable, false) - def verifyException(e: Exception, column: String): Unit = { - var ex = e.getCause - while (ex != null && - !ex.isInstanceOf[NotNullableViolationException]) { - ex = ex.getCause - } - if (ex == null) { - fail(s"Expected a NotNullableViolationException but got '${e.getMessage}'.") - } - assert(ex.getMessage.contains(s"Value for column '$column' cannot be null.")) - } - sql("INSERT INTO TABLE test_table SELECT 1, 'a'") - verifyException( - intercept[SparkException] { - sql("INSERT INTO TABLE test_table SELECT null, 'b'") - }, "i") - verifyException( - intercept[SparkException] { - sql("INSERT INTO TABLE test_table SELECT 2, null") - }, "s") + sql("INSERT INTO TABLE test_table SELECT 2, null") } } } From 03c3c90f982a6bf4b2a3b24271679cc124f96480 Mon Sep 17 00:00:00 2001 From: Maryann Xue Date: Tue, 19 Jun 2018 10:10:01 -0700 Subject: [PATCH 4/5] change indentation --- .../scala/org/apache/spark/sql/sources/InsertSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 40f56d4b76564..78e5ba4be6d05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -31,9 +31,9 @@ import org.apache.spark.util.Utils class SimpleInsertSource extends SchemaRelationProvider { override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String], - schema: StructType): BaseRelation = { + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { SimpleInsert(schema)(sqlContext.sparkSession) } } From 049844e5a99a45e2537134b5f6f620014e0cbb37 Mon Sep 17 00:00:00 2001 From: Maryann Xue Date: Tue, 19 Jun 2018 10:15:29 -0700 Subject: [PATCH 5/5] Code clean-up --- .../sql/execution/datasources/InsertIntoDataSourceCommand.scala | 2 +- .../test/scala/org/apache/spark/sql/sources/InsertSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index ee29c6e7426e5..80d7608a22891 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -38,7 +38,7 @@ case class InsertIntoDataSourceCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = Dataset.ofRows(sparkSession, query) - // Data should have been casted to the schema of the insert relation. + // Data has been casted to the target relation's schema by the PreprocessTableInsertion rule. relation.insert(data, overwrite) // Re-cache all cached plans(including this relation itself, if it's cached) that refer to this diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 78e5ba4be6d05..438d5d8176b8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -561,7 +561,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { compressed = false, properties = Map.empty), schema = schema, - provider = Some("org.apache.spark.sql.sources.SimpleInsertSource")) + provider = Some(classOf[SimpleInsertSource].getName)) spark.sessionState.catalog.createTable(newTable, false)