Skip to content

Commit

Permalink
[Spark] Make update catalog schema truncation threshold configurable (#…
Browse files Browse the repository at this point in the history
…2911)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Currently, during schema sync to catalog, the whole schema gets
truncated if any of the fields is longer than 4000 characters. This PR
makes this threshold a configurable through the config `DeltaSQLConf.
DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD`.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Created variations of existing test cases that validate that setting the
config to a bigger value skips the truncation.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No

Co-authored-by: Tathagata Das <[email protected]>
  • Loading branch information
dhruvarya-db and tdas authored Apr 24, 2024
1 parent 5ace827 commit 3c09d95
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,11 @@ case class CreateDeltaTableCommand(
if (conf.getConf(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED)) {
// In the case we're creating a Delta table on an existing path and adopting the schema
val schema = if (table.schema.isEmpty) snapshot.schema else table.schema
val truncatedSchema = UpdateCatalog.truncateSchemaIfNecessary(schema)
val additionalProperties = if (truncatedSchema.isEmpty) {
Map(UpdateCatalog.ERROR_KEY -> UpdateCatalog.LONG_SCHEMA_ERROR)
} else {
Map.empty
}
val truncationThreshold = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD)
val (truncatedSchema, additionalProperties) = UpdateCatalog.truncateSchemaIfNecessary(
snapshot.schema,
truncationThreshold)

table.copy(
schema = truncatedSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,14 @@ case class UpdateCatalog(table: CatalogTable) extends UpdateCatalogBase {


override protected def schemaHasChanged(snapshot: Snapshot, spark: SparkSession): Boolean = {
// We need to check whether the schema in the catalog matches the current schema. If a
// field in the schema is very long, we cannot store the schema in the catalog, therefore
// here we have to compare what's in the catalog with what we actually can store in the
// catalog
val schemaChanged = UpdateCatalog.truncateSchemaIfNecessary(snapshot.schema) != table.schema
// We need to check whether the schema in the catalog matches the current schema.
// Depending on the schema validation policy, the schema might need to be truncated.
// Therefore, we should use what we want to store in the catalog for comparison.
val truncationThreshold = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD)
val schemaChanged = table.schema != UpdateCatalog.truncateSchemaIfNecessary(
snapshot.schema,
truncationThreshold)._1
// The table may have been dropped as we're just about to update the information. There is
// unfortunately no great way to avoid a race condition, but we do one last check here as
// updates may have been queued for some time.
Expand Down Expand Up @@ -261,11 +264,11 @@ object UpdateCatalog {
// This is the encoding of the database for the Hive MetaStore
private val latin1 = Charset.forName("ISO-8859-1")

// Maximum number of characters that a catalog can store.
val MAX_CATALOG_TYPE_DDL_LENGTH = 4000
val ERROR_KEY = "delta.catalogUpdateError"
val LONG_SCHEMA_ERROR: String = "The schema contains a very long nested field and cannot be " +
"stored in the catalog."
val NON_LATIN_CHARS_ERROR: String = "The schema contains non-latin encoding characters and " +
"cannot be stored in the catalog."
val HIVE_METASTORE_NAME = "hive_metastore"

private def getOrCreateExecutionContext(conf: SQLConf): ExecutionContext = synchronized {
Expand Down Expand Up @@ -313,12 +316,11 @@ object UpdateCatalog {
catalog.qualifyIdentifier(TableIdentifier(table.identifier.table, Some(table.database)))
val db = qualifiedIdentifier.database.get
val tblName = qualifiedIdentifier.table
val schema = truncateSchemaIfNecessary(snapshot.schema)
val additionalProperties = if (schema.isEmpty) {
Map(ERROR_KEY -> LONG_SCHEMA_ERROR)
} else {
Map.empty
}
val truncationThreshold = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD)
val (schema, additionalProperties) = truncateSchemaIfNecessary(
snapshot.schema,
truncationThreshold)

// We call the lower level API so that we can actually drop columns. We also assume that
// all columns are data columns so that we don't have to deal with partition columns
Expand Down Expand Up @@ -346,25 +348,28 @@ object UpdateCatalog {
}

/**
* If a field in the schema has a very long string representation, then the schema will be
* If the schema contains non-latin encoding characters, the schema can become garbled.
* We need to truncate the schema in that case.
* Also, if any of the fields is longer than `truncationThreshold`, then the schema will be
* truncated to an empty schema to avoid corruption.
* Also, if the schema contains non-latin encoding characters, the schema will be garbled. In
* this case we also truncate the schema.
*
* @return a tuple of the truncated schema and a map of error messages if any.
* The error message is only set if the schema is truncated. Truncation
* can happen if the schema is too long or if it contains non-latin characters.
*/
def truncateSchemaIfNecessary(schema: StructType): StructType = {
def truncateSchemaIfNecessary(
schema: StructType,
truncationThreshold: Long): (StructType, Map[String, String]) = {
// Encoders are not threadsafe
val encoder = latin1.newEncoder()
def isColumnValid(f: StructField): Boolean = {
val typeString = f.dataType.catalogString
encoder.canEncode(f.name) &&
typeString.length <= MAX_CATALOG_TYPE_DDL_LENGTH &&
encoder.canEncode(typeString)
}

if (schema.exists(f => !isColumnValid(f))) {
new StructType()
} else {
schema
schema.foreach { f =>
if (f.dataType.catalogString.length > truncationThreshold) {
return (new StructType(), Map(UpdateCatalog.ERROR_KEY -> LONG_SCHEMA_ERROR))
}
if (!encoder.canEncode(f.name) || !encoder.canEncode(f.dataType.catalogString)) {
return (new StructType(), Map(UpdateCatalog.ERROR_KEY -> NON_LATIN_CHARS_ERROR))
}
}
(schema, Map.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,15 @@ trait DeltaSQLConfBase {
.checkValue(_ > 0, "threadPoolSize must be positive")
.createWithDefault(20)

val DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD =
buildConf("catalog.update.longFieldTruncationThreshold")
.internal()
.doc(
"When syncing table schema to the catalog, Delta will truncate the whole schema " +
"if any field is longer than this threshold.")
.longConf
.createWithDefault(4000)

val DELTA_LIST_FROM_COMMIT_STORE_THREAD_POOL_SIZE =
buildStaticConf("commitStore.getCommits.threadPoolSize")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.util.control.NonFatal

import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.hooks.UpdateCatalog
import org.apache.spark.sql.delta.hooks.UpdateCatalog.MAX_CATALOG_TYPE_DDL_LENGTH
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaHiveTest
import com.fasterxml.jackson.core.JsonParseException
Expand Down Expand Up @@ -186,6 +185,9 @@ class DeltaUpdateCatalogSuite
}
}

val MAX_CATALOG_TYPE_DDL_LENGTH: Long =
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.defaultValue.get


test("convert to delta with partitioning change") {
withTable(tbl) {
Expand Down Expand Up @@ -326,8 +328,6 @@ class DeltaUpdateCatalogSuite
}


import UpdateCatalog.MAX_CATALOG_TYPE_DDL_LENGTH

test("Very long schemas can be stored in the catalog") {
withTable(tbl) {
val schema = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
Expand All @@ -340,47 +340,108 @@ class DeltaUpdateCatalogSuite
}
}

test("Schemas that contain very long fields cannot be stored in the catalog") {
withTable(tbl) {
val schema = new StructType()
.add("i", StringType)
.add("struct", StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType))))
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
s"characters for this test")

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
verifySchemaInCatalog()
for (truncationThreshold <- Seq(99999, MAX_CATALOG_TYPE_DDL_LENGTH, 4020))
test(s"Schemas that contain very long fields cannot be stored in the catalog " +
" when longer than the truncation threshold " +
s" [DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD = $truncationThreshold]") {
withSQLConf(
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.key ->
truncationThreshold.toString) {
withTable(tbl) {
val schema = new StructType()
.add("i", StringType)
.add("struct", StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType))))
require(
schema.toDDL.length >= 4020,
s"The length of the schema should be over 4020 " +
s"characters for this test")

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
if (truncationThreshold > 4020) {
verifyTableMetadata(expectedSchema = schema)
} else {
verifySchemaInCatalog()
}
}
}
}

test("Schemas that contain very long fields cannot be stored in the catalog - array") {
withTable(tbl) {
val struct = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
val schema = new StructType()
.add("i", StringType)
.add("array", ArrayType(struct))
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
s"characters for this test")

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
verifySchemaInCatalog()
for (truncationThreshold <- Seq(99999, MAX_CATALOG_TYPE_DDL_LENGTH))
test(s"Schemas that contain very long fields cannot be stored in the catalog - array" +
" when longer than the truncation threshold " +
s" [DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD = $truncationThreshold]") {
withSQLConf(
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.key ->
truncationThreshold.toString) {
withTable(tbl) {
val struct = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
val schema = new StructType()
.add("i", StringType)
.add("array", ArrayType(struct))
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
s"characters for this test")

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
if (truncationThreshold == 99999) {
verifyTableMetadata(expectedSchema = schema)
} else {
verifySchemaInCatalog()
}
}
}
}

test("Schemas that contain very long fields cannot be stored in the catalog - map") {
withTable(tbl) {
val struct = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
val schema = new StructType()
.add("i", StringType)
.add("map", MapType(StringType, struct))
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
s"characters for this test")
for (truncationThreshold <- Seq(99999, MAX_CATALOG_TYPE_DDL_LENGTH))
test(s"Schemas that contain very long fields cannot be stored in the catalog - map" +
" when longer than the truncation threshold " +
s" [DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD = $truncationThreshold]") {
withSQLConf(
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.key ->
truncationThreshold.toString) {
withTable(tbl) {
val struct = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
val schema = new StructType()
.add("i", StringType)
.add("map", MapType(StringType, struct))
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
s"characters for this test")

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
if (truncationThreshold == 99999) {
verifyTableMetadata(expectedSchema = schema)
} else {
verifySchemaInCatalog()
}
}
}
}

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
verifySchemaInCatalog()
for (truncationThreshold <- Seq(99999, MAX_CATALOG_TYPE_DDL_LENGTH))
test(s"Very long nested fields cannot be stored in the catalog - partitioned" +
" when longer than the truncation threshold " +
s" [DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD = $truncationThreshold]") {
withSQLConf(
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.key ->
truncationThreshold.toString) {
withTable(tbl) {
val schema = new StructType()
.add("i", StringType)
.add("part", StringType)
.add("struct", StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType))))
require(
schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
"The length of the schema should be over 4000 characters for this test")

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta PARTITIONED BY (part)")
if (truncationThreshold == 99999) {
verifyTableMetadata(expectedSchema = schema)
} else {
verifySchemaInCatalog()
}
}
}
}

Expand All @@ -396,34 +457,21 @@ class DeltaUpdateCatalogSuite
}
}

test("Very long nested fields cannot be stored in the catalog - partitioned") {
withTable(tbl) {
val schema = new StructType()
.add("i", StringType)
.add("part", StringType)
.add("struct", StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType))))
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
"The length of the schema should be over 4000 characters for this test")

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta PARTITIONED BY (part)")
verifySchemaInCatalog()
}
}

// scalastyle:off nonascii
test("Schema containing non-latin characters cannot be stored - top-level") {
withTable(tbl) {
val schema = new StructType().add("今天", "string")
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
verifySchemaInCatalog()
verifySchemaInCatalog(expectedErrorMessage = UpdateCatalog.NON_LATIN_CHARS_ERROR)
}
}

test("Schema containing non-latin characters cannot be stored - struct") {
withTable(tbl) {
val schema = new StructType().add("struct", new StructType().add("今天", "string"))
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
verifySchemaInCatalog()
verifySchemaInCatalog(expectedErrorMessage = UpdateCatalog.NON_LATIN_CHARS_ERROR)
}
}

Expand All @@ -434,7 +482,7 @@ class DeltaUpdateCatalogSuite
.add("array", ArrayType(new StructType().add("今天", "string")))

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
verifySchemaInCatalog()
verifySchemaInCatalog(expectedErrorMessage = UpdateCatalog.NON_LATIN_CHARS_ERROR)
}
}

Expand All @@ -445,7 +493,7 @@ class DeltaUpdateCatalogSuite
.add("map", MapType(StringType, new StructType().add("今天", "string")))

sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
verifySchemaInCatalog()
verifySchemaInCatalog(expectedErrorMessage = UpdateCatalog.NON_LATIN_CHARS_ERROR)
}
}
// scalastyle:on nonascii
Expand All @@ -456,7 +504,8 @@ class DeltaUpdateCatalogSuite
*/
private def verifySchemaInCatalog(
table: String = tbl,
catalogPartitionCols: Seq[String] = Nil): Unit = {
catalogPartitionCols: Seq[String] = Nil,
expectedErrorMessage: String = UpdateCatalog.LONG_SCHEMA_ERROR): Unit = {
val cat = spark.sessionState.catalog.externalCatalog.getTable("default", table)
assert(cat.schema.isEmpty, s"Schema wasn't empty")
assert(cat.partitionColumnNames === catalogPartitionCols)
Expand All @@ -465,7 +514,7 @@ class DeltaUpdateCatalogSuite
s"Properties didn't match for table: $table. Expected: ${getBaseProperties(snapshot)}, " +
s"Got: ${cat.properties}")
}
assert(cat.properties(UpdateCatalog.ERROR_KEY) === UpdateCatalog.LONG_SCHEMA_ERROR)
assert(cat.properties(UpdateCatalog.ERROR_KEY) === expectedErrorMessage)

// Make sure table is readable
checkAnswer(spark.table(table), Nil)
Expand Down

0 comments on commit 3c09d95

Please sign in to comment.