Skip to content

Commit

Permalink
[CARMEL-6033] NullPointException in uploading when NOT NULL constrain…
Browse files Browse the repository at this point in the history
…t enforced (#976)
  • Loading branch information
fenzhu authored and GitHub Enterprise committed Jun 17, 2022
1 parent 2a16fb3 commit 70a1bfe
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,13 @@ case class UploadDataCommand(
.schema(if (optionSpec.getOrElse(Map.empty).get("header").getOrElse("false") == "true") {
null
} else {
targetTable.schema
if (sparkSession.sessionState.conf.enforceSchemaNotNull) {
// We do not use NOT NULL information in target table to avoid wrong codegen
// These constraints are enforced in subsequent insertion phases
targetTable.schema.asNullable
} else {
targetTable.schema
}
})
.options(if (optionSpec.nonEmpty) optionSpec.get
else scala.collection.Map("header" -> "false", "delimiter" -> ","))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,43 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
}
}

test("CARMEL-6033: NullPointException in uploading when NOT NULL constraint enforced") {
import testImplicits._
withTable("t") {
withTempDir { dir =>
withSQLConf(SQLConf.ENFORCE_SCHEMA_NOT_NULL.key -> "true") {
sql(
s"""
|CREATE TABLE t(id int, data string not null) USING parquet
""".stripMargin)

val fs = FileSystem.get(sparkContext.hadoopConfiguration)
val defaultFs = FileSystem.getDefaultUri((sparkContext.hadoopConfiguration)).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"
val basePath = dir.getCanonicalPath

val table = TableIdentifier("t")
val catalog = spark.sessionState.catalog
val targetTable = catalog.getTableMetadata(table)

val uploadDataCommand = UploadDataCommand(table, "", isOverwrite = true, None, None)
val path = new Path(basePath, "csv")
val tuples: Seq[(Int, String)] = Seq((2, null))
tuples.toDF().coalesce(1).write.csv(path.toString)
val files = fs.listStatus(new Path(path.toString)).
filter(p => p.getPath.toString.endsWith(".csv")).map(p => p.getPath.toString)
val csvFile = new Path(files(0))
val e = intercept[org.apache.spark.SparkException] {
uploadDataCommand.performUpload(spark, fs, targetTable,
csvFile, isDefaultLocal)
}
assert(Utils.findFirstCause(e).getMessage.contains(
"NOT NULL constraint violated for column: data"))
}
}
}
}

private def getCatalogTable(tableIdent: TableIdentifier): CatalogTable = {
spark.sessionState.catalog.getTableMetadata(tableIdent)
}
Expand Down

0 comments on commit 70a1bfe

Please sign in to comment.