Skip to content

Commit

Permalink
[SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Change insert input schema type: "insertRelationType" -> "insertRelationType.asNullable", in order to avoid nullable being overridden.

## How was this patch tested?

Added one test in InsertSuite.

Author: Maryann Xue <[email protected]>

Closes #21585 from maryannxue/spark-24583.

(cherry picked from commit bc0498d)
Signed-off-by: Xiao Li <[email protected]>
  • Loading branch information
maryannxue authored and gatorsmile committed Jun 19, 2018
1 parent 50cdb41 commit d687d97
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ case class InsertIntoDataSourceCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = Dataset.ofRows(sparkSession, query)
// Apply the schema of the existing table to the new data.
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
// Data has been casted to the target relation's schema by the PreprocessTableInsertion rule.
relation.insert(data, overwrite)

// Re-cache all cached plans(including this relation itself, if it's cached) that refer to this
// data source relation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,36 @@ package org.apache.spark.sql.sources
import java.io.File

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class SimpleInsertSource extends SchemaRelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
SimpleInsert(schema)(sqlContext.sparkSession)
}
}

case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val sparkSession: SparkSession)
extends BaseRelation with InsertableRelation {

override def sqlContext: SQLContext = sparkSession.sqlContext

override def schema: StructType = userSpecifiedSchema

override def insert(input: DataFrame, overwrite: Boolean): Unit = {
input.collect
}
}

class InsertSuite extends DataSourceTest with SharedSQLContext {
import testImplicits._

Expand Down Expand Up @@ -520,4 +544,29 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
}
}
}

test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") {
withTable("test_table") {
val schema = new StructType()
.add("i", LongType, false)
.add("s", StringType, false)
val newTable = CatalogTable(
identifier = TableIdentifier("test_table", None),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
properties = Map.empty),
schema = schema,
provider = Some(classOf[SimpleInsertSource].getName))

spark.sessionState.catalog.createTable(newTable, false)

sql("INSERT INTO TABLE test_table SELECT 1, 'a'")
sql("INSERT INTO TABLE test_table SELECT 2, null")
}
}
}

0 comments on commit d687d97

Please sign in to comment.