Skip to content

Commit

Permalink
[Spark] Add dataframe reader options to unblock non-additive schema c…
Browse files Browse the repository at this point in the history
…hanges (delta-io#4126)

## Description

Non-additive schema changes - DROP/RENAME and, since
https://github.com/databricks-eng/runtime/pull/124363 , type changes -
in streaming block the stream until the user sets a SQL conf to unblock
them:
```
spark.databricks.delta.streaming.allowSourceColumnRename
spark.databricks.delta.streaming.allowSourceColumnDrop
spark.databricks.delta.streaming.allowSourceColumnTypeChange
```
This change adds dataframe reader options as an alternative to SQL confs
to unblock non-additive schema changes:
```
spark.readStream
  .option("allowSourceColumnRename", "true")
  .option("allowSourceColumnDrop", "true")
  .option("allowSourceColumnTypeChange", "true")
```

## How was this patch tested?
Extended existing tests in `DeltaSourceMetadataEvolutionSupportSuite` to
also cover dataframe reader options.

## This PR introduces the following *user-facing* changes
The error thrown on non-additive schema changes during streaming is
updated to suggest dataframe reader options in addition to SQL confs to
unblock the stream:
```
[DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_POST_SCHEMA_EVOLUTION]
We've detected one or more non-additive schema change(s) (DROP) between Delta version 1 and 2 in the Delta streaming source.
Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version 2.
Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing.

<NEW>
Using dataframe reader option(s):
  .option("allowSourceColumnDrop", "true")
<NEW>

Using SQL configuration(s):
To unblock for this particular stream just for this series of schema change(s):
  SET spark.databricks.delta.streaming.allowSourceColumnDrop.ckpt_123456 = 2;
To unblock for this particular stream:
  SET spark.databricks.delta.streaming.allowSourceColumnDrop.ckpt_123456 = "always";
To unblock for all streams:
  SET spark.databricks.delta.streaming.allowSourceColumnDrop= "always";

```
The user can use the available reader option to unblock a given type of
non-additive schema change:
```
spark.readStream
  .option("allowSourceColumnRename", "true")
  .option("allowSourceColumnDrop", "true")
  .option("allowSourceColumnTypeChange", "true")
```
  • Loading branch information
johanl-db authored and anoopj committed Feb 24, 2025
1 parent 65d5dd9 commit 789afe9
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 32 deletions.
14 changes: 14 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2319,6 +2319,13 @@
"Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version <currentSchemaChangeVersion>.",
"Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing.",
"",
"Using dataframe reader option(s):",
"To unblock for this particular stream just for this series of schema change(s):",
"<unblockChangeOptions>",
"To unblock for this particular stream:",
"<unblockStreamOptions>",
"",
"Using SQL configuration(s):",
"To unblock for this particular stream just for this series of schema change(s):",
"<unblockChangeConfs>",
"To unblock for this particular stream:",
Expand All @@ -2338,6 +2345,13 @@
"Please check if you want to update your streaming query before we proceed with stream processing using the finalized schema at version <currentSchemaChangeVersion>.",
"Once you have updated your streaming query or have decided there is no need to update it, you can set the following configuration to unblock the type change(s) and continue stream processing.",
"",
"Using dataframe reader option:",
"To unblock for this particular stream just for this series of schema change(s):",
"<unblockChangeOptions>",
"To unblock for this particular stream:",
"<unblockStreamOptions>",
"",
"Using SQL configuration:",
"To unblock for this particular stream just for this series of schema change(s):",
"<unblockChangeConfs>",
"To unblock for this particular stream:",
Expand Down
18 changes: 18 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3136,7 +3136,14 @@ trait DeltaErrorsBase
previousSchemaChangeVersion: Long,
currentSchemaChangeVersion: Long,
checkpointHash: Int,
readerOptionsUnblock: Seq[String],
sqlConfsUnblock: Seq[String]): Throwable = {
val unblockChangeOptions = readerOptionsUnblock.map { option =>
s""" .option("$option", "$currentSchemaChangeVersion")"""
}.mkString("\n")
val unblockStreamOptions = readerOptionsUnblock.map { option =>
s""" .option("$option", "always")"""
}.mkString("\n")
val unblockChangeConfs = sqlConfsUnblock.map { conf =>
s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;"""
}.mkString("\n")
Expand All @@ -3154,6 +3161,8 @@ trait DeltaErrorsBase
previousSchemaChangeVersion.toString,
currentSchemaChangeVersion.toString,
currentSchemaChangeVersion.toString,
unblockChangeOptions,
unblockStreamOptions,
unblockChangeConfs,
unblockStreamConfs,
unblockAllConfs
Expand All @@ -3165,6 +3174,7 @@ trait DeltaErrorsBase
previousSchemaChangeVersion: Long,
currentSchemaChangeVersion: Long,
checkpointHash: Int,
readerOptionsUnblock: Seq[String],
sqlConfsUnblock: Seq[String],
wideningTypeChanges: Seq[TypeChange]): Throwable = {

Expand All @@ -3173,6 +3183,12 @@ trait DeltaErrorsBase
s"${change.toType.sql}"
}.mkString("\n")

val unblockChangeOptions = readerOptionsUnblock.map { option =>
s""" .option("$option", "$currentSchemaChangeVersion")"""
}.mkString("\n")
val unblockStreamOptions = readerOptionsUnblock.map { option =>
s""" .option("$option", "always")"""
}.mkString("\n")
val unblockChangeConfs = sqlConfsUnblock.map { conf =>
s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;"""
}.mkString("\n")
Expand All @@ -3190,6 +3206,8 @@ trait DeltaErrorsBase
currentSchemaChangeVersion.toString,
wideningTypeChangesStr,
currentSchemaChangeVersion.toString,
unblockChangeOptions,
unblockStreamOptions,
unblockChangeConfs,
unblockStreamConfs,
unblockAllConfs
Expand Down
10 changes: 10 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ trait DeltaReadOptions extends DeltaOptionParser {
val schemaTrackingLocation = options.get(SCHEMA_TRACKING_LOCATION)

val sourceTrackingId = options.get(STREAMING_SOURCE_TRACKING_ID)

val allowSourceColumnRename = options.get(ALLOW_SOURCE_COLUMN_RENAME)

val allowSourceColumnDrop = options.get(ALLOW_SOURCE_COLUMN_DROP)

val allowSourceColumnTypeChange = options.get(ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
}


Expand Down Expand Up @@ -289,6 +295,10 @@ object DeltaOptions extends DeltaLogging {
*/
val STREAMING_SOURCE_TRACKING_ID = "streamingSourceTrackingId"

val ALLOW_SOURCE_COLUMN_DROP = "allowSourceColumnDrop"
val ALLOW_SOURCE_COLUMN_RENAME = "allowSourceColumnRename"
val ALLOW_SOURCE_COLUMN_TYPE_CHANGE = "allowSourceColumnTypeChange"

/**
* An option to control if delta will write partition columns to data files
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ object DeltaDataSource extends DatabricksLogging {
parameters: Map[String, String],
sourceMetadataPathOpt: Option[String] = None,
mergeConsecutiveSchemaChanges: Boolean = false): Option[DeltaSourceMetadataTrackingLog] = {
val options = new CaseInsensitiveStringMap(parameters.asJava)

DeltaDataSource.extractSchemaTrackingLocationConfig(spark, parameters)
.map { schemaTrackingLocation =>
Expand All @@ -451,7 +450,7 @@ object DeltaDataSource extends DatabricksLogging {

DeltaSourceMetadataTrackingLog.create(
spark, schemaTrackingLocation, sourceSnapshot,
Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)),
parameters,
sourceMetadataPathOpt,
mergeConsecutiveSchemaChanges
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ object DeltaSourceMetadataEvolutionSupport {

/**
* Defining the different combinations of non-additive schema changes to detect them and allow
* users to vet and unblock them using a corresponding SQL conf:
* users to vet and unblock them using a corresponding SQL conf or reader option:
* - dropping columns
* - renaming columns
* - widening data types
Expand All @@ -460,48 +460,61 @@ object DeltaSourceMetadataEvolutionSupport {
val isDrop: Boolean
val isTypeWidening: Boolean
val sqlConfsUnblock: Seq[String]
val readerOptionsUnblock: Seq[String]
}

// Single types of schema change, typically caused by a single ALTER TABLE operation.
private case object SchemaChangeRename extends SchemaChangeType {
override val name = "RENAME COLUMN"
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME)
override val (isRename, isDrop, isTypeWidening) = (true, false, false)
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME)
override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME)
}
private case object SchemaChangeDrop extends SchemaChangeType {
override val name = "DROP COLUMN"
override val (isRename, isDrop, isTypeWidening) = (false, true, false)
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_DROP)
override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
}
private case object SchemaChangeTypeWidening extends SchemaChangeType {
override val name = "TYPE WIDENING"
override val (isRename, isDrop, isTypeWidening) = (false, false, true)
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_TYPE_CHANGE)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
}

// Combinations of rename, drop and type change -> can be caused by a complete overwrite.
private case object SchemaChangeRenameAndDrop extends SchemaChangeType {
override val name = "RENAME AND DROP COLUMN"
override val (isRename, isDrop, isTypeWidening) = (true, true, false)
override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME_DROP)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
}
private case object SchemaChangeRenameAndTypeWidening extends SchemaChangeType {
override val name = "RENAME AND TYPE WIDENING"
override val (isRename, isDrop, isTypeWidening) = (true, false, true)
override val sqlConfsUnblock: Seq[String] =
Seq(SQL_CONF_UNBLOCK_RENAME, SQL_CONF_UNBLOCK_TYPE_CHANGE)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP)
}
private case object SchemaChangeDropAndTypeWidening extends SchemaChangeType {
override val name = "DROP AND TYPE WIDENING"
override val (isRename, isDrop, isTypeWidening) = (false, true, true)
override val sqlConfsUnblock: Seq[String] =
Seq(SQL_CONF_UNBLOCK_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
}
private case object SchemaChangeRenameAndDropAndTypeWidening extends SchemaChangeType {
override val name = "RENAME, DROP AND TYPE WIDENING"
override val (isRename, isDrop, isTypeWidening) = (true, true, true)
override val sqlConfsUnblock: Seq[String] =
Seq(SQL_CONF_UNBLOCK_RENAME_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE)
override val readerOptionsUnblock: Seq[String] =
Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE)
}

private final val allSchemaChangeTypes = Seq(
Expand Down Expand Up @@ -541,11 +554,12 @@ object DeltaSourceMetadataEvolutionSupport {

/**
* Returns whether the given type of non-additive schema change was unblocked by setting one of
* the corresponding SQL confs.
* the corresponding SQL confs or reader options.
*/
private def isChangeUnblocked(
spark: SparkSession,
change: SchemaChangeType,
options: DeltaOptions,
checkpointHash: Int,
schemaChangeVersion: Long): Boolean = {

Expand All @@ -561,11 +575,20 @@ object DeltaSourceMetadataEvolutionSupport {
validConfKeysValuePair.exists(p => getConf(p._1).contains(p._2))
}

val isBlockedRename = change.isRename && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) &&
def isUnblockedByReaderOption(readerOption: Option[String]): Boolean = {
readerOption.contains("always") || readerOption.contains(schemaChangeVersion.toString)
}

val isBlockedRename = change.isRename &&
!isUnblockedByReaderOption(options.allowSourceColumnRename) &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP)
val isBlockedDrop = change.isDrop && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) &&
val isBlockedDrop = change.isDrop &&
!isUnblockedByReaderOption(options.allowSourceColumnDrop) &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP)
val isBlockedTypeChange = change.isTypeWidening &&
!isUnblockedByReaderOption(options.allowSourceColumnTypeChange) &&
!isUnblockedBySQLConf(SQL_CONF_UNBLOCK_TYPE_CHANGE)

!isBlockedRename && !isBlockedDrop && !isBlockedTypeChange
Expand All @@ -576,7 +599,7 @@ object DeltaSourceMetadataEvolutionSupport {
/**
* Whether to accept widening type changes:
* - when true, widening type changes cause the stream to fail, requesting user to review and
* unblock them via a SQL conf.
* unblock them via a SQL conf or reader option.
* - when false, widening type changes are rejected without possibility to unblock, similar to
* any other arbitrary type change.
*/
Expand All @@ -595,34 +618,45 @@ object DeltaSourceMetadataEvolutionSupport {
// scalastyle:off
/**
* Given a non-additive operation type from a previous schema evolution, check we can process
* using the new schema given any SQL conf users have explicitly set to unblock.
* using the new schema given any SQL conf or dataframe reader option users have explicitly set to
* unblock.
* The SQL conf can take one of following formats:
* 1. spark.databricks.delta.streaming.allowSourceColumn$action = "always"
* -> allows non-additive schema change to propagate for all streams.
* 2. spark.databricks.delta.streaming.allowSourceColumn$action.$checkpointHash = "always"
* -> allows non-additive schema change to propagate for this particular stream.
* 3. spark.databricks.delta.streaming.allowSourceColumn$action.$checkpointHash = $deltaVersion
* -> allow non-additive schema change to propagate only for this particular stream source
* -> allow non-additive schema change to propagate only for this particular stream source
* table version.
* The reader options can take one of the following format:
* 1. .option("allowSourceColumn$action", "always")
* -> allows non-additive schema change to propagate for this particular stream.
* 2. .option("allowSourceColumn$action", "$deltaVersion")
* -> allow non-additive schema change to propagate only for this particular stream source
* table version.
* where `allowSourceColumn$action` is one of:
* 1. `allowSourceColumnRename` to allow column renames.
* 2. `allowSourceColumnDrop` to allow column drops.
* 3. `allowSourceColumnRenameAndDrop` to allow both column drops and renames.
* 4. `allowSourceColumnTypeChange` to allow widening type changes.
* 3. `allowSourceColumnTypeChange` to allow widening type changes.
* For SQL confs only, action can also be `allowSourceColumnRenameAndDrop` to allow both column
* drops and renames.
*
* We will check for any of these configs given the non-additive operation, and throw a proper
* error message to instruct the user to set the SQL conf if they would like to unblock.
* error message to instruct the user to set the SQL conf / reader options if they would like to
* unblock.
*
* @param metadataPath The path to the source-unique metadata location under checkpoint
* @param currentSchema The current persisted schema
* @param previousSchema The previous persisted schema
*/
// scalastyle:on
protected[sources] def validateIfSchemaChangeCanBeUnblockedWithSQLConf(
protected[sources] def validateIfSchemaChangeCanBeUnblocked(
spark: SparkSession,
parameters: Map[String, String],
metadataPath: String,
currentSchema: PersistedMetadata,
previousSchema: PersistedMetadata): Unit = {
val options = new DeltaOptions(parameters, spark.sessionState.conf)
val checkpointHash = getCheckpointHash(metadataPath)

// The start version of a possible series of consecutive schema changes.
Expand All @@ -644,7 +678,7 @@ object DeltaSourceMetadataEvolutionSupport {
determineNonAdditiveSchemaChangeType(
spark, currentSchema.dataSchema, previousSchema.dataSchema).foreach { change =>
if (!isChangeUnblocked(
spark, change, checkpointHash, currentSchemaChangeVersion)) {
spark, change, options, checkpointHash, currentSchemaChangeVersion)) {
// Throw error to prompt user to set the correct confs
change match {
case SchemaChangeTypeWidening =>
Expand All @@ -656,6 +690,7 @@ object DeltaSourceMetadataEvolutionSupport {
previousSchemaChangeVersion,
currentSchemaChangeVersion,
checkpointHash,
change.readerOptionsUnblock,
change.sqlConfsUnblock,
wideningTypeChanges)

Expand All @@ -665,6 +700,7 @@ object DeltaSourceMetadataEvolutionSupport {
previousSchemaChangeVersion,
currentSchemaChangeVersion,
checkpointHash,
change.readerOptionsUnblock,
change.sqlConfsUnblock)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.sources
// scalastyle:off import.ordering.noEmptyLine
import java.io.InputStream

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.streaming.{JsonSchemaSerializer, PartitionAndDataSchema, SchemaTrackingLog}
Expand All @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
// scalastyle:on import.ordering.noEmptyLine

/**
Expand Down Expand Up @@ -240,10 +242,12 @@ object DeltaSourceMetadataTrackingLog extends Logging {
sparkSession: SparkSession,
rootMetadataLocation: String,
sourceSnapshot: SnapshotDescriptor,
sourceTrackingId: Option[String] = None,
parameters: Map[String, String],
sourceMetadataPathOpt: Option[String] = None,
mergeConsecutiveSchemaChanges: Boolean = false,
initMetadataLogEagerly: Boolean = true): DeltaSourceMetadataTrackingLog = {
val options = new CaseInsensitiveStringMap(parameters.asJava)
val sourceTrackingId = Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID))
val metadataTrackingLocation = fullMetadataTrackingLocation(
rootMetadataLocation, sourceSnapshot.deltaLog.tableId, sourceTrackingId)
val log = new DeltaSourceMetadataTrackingLog(
Expand Down Expand Up @@ -296,7 +300,8 @@ object DeltaSourceMetadataTrackingLog extends Logging {
(log.getPreviousTrackedMetadata, log.getCurrentTrackedMetadata, sourceMetadataPathOpt) match {
case (Some(prev), Some(curr), Some(metadataPath)) =>
DeltaSourceMetadataEvolutionSupport
.validateIfSchemaChangeCanBeUnblockedWithSQLConf(sparkSession, metadataPath, curr, prev)
.validateIfSchemaChangeCanBeUnblocked(
sparkSession, parameters, metadataPath, curr, prev)
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,7 @@ trait DeltaErrorsSuiteBase
nonAdditiveSchemaChangeOpType = "RENAME AND TYPE WIDENING",
previousSchemaChangeVersion = 0,
currentSchemaChangeVersion = 1,
readerOptionsUnblock = Seq("allowSourceColumnRename", "allowSourceColumnTypeChange"),
sqlConfsUnblock = Seq(
"spark.databricks.delta.streaming.allowSourceColumnRename",
"spark.databricks.delta.streaming.allowSourceColumnTypeChange"),
Expand All @@ -2743,6 +2744,12 @@ trait DeltaErrorsSuiteBase
"opType" -> "RENAME AND TYPE WIDENING",
"previousSchemaChangeVersion" -> "0",
"currentSchemaChangeVersion" -> "1",
"unblockChangeOptions" ->
s""" .option("allowSourceColumnRename", "1")
| .option("allowSourceColumnTypeChange", "1")""".stripMargin,
"unblockStreamOptions" ->
s""" .option("allowSourceColumnRename", "always")
| .option("allowSourceColumnTypeChange", "always")""".stripMargin,
"unblockChangeConfs" ->
s""" SET spark.databricks.delta.streaming.allowSourceColumnRename.ckpt_15 = 1;
| SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;""".stripMargin,
Expand All @@ -2760,6 +2767,7 @@ trait DeltaErrorsSuiteBase
throw DeltaErrors.cannotContinueStreamingTypeWidening(
previousSchemaChangeVersion = 0,
currentSchemaChangeVersion = 1,
readerOptionsUnblock = Seq("allowSourceColumnTypeChange"),
sqlConfsUnblock = Seq("spark.databricks.delta.streaming.allowSourceColumnTypeChange"),
checkpointHash = 15,
wideningTypeChanges = Seq(TypeChange(None, IntegerType, LongType, Seq("a"))))
Expand All @@ -2770,6 +2778,8 @@ trait DeltaErrorsSuiteBase
"previousSchemaChangeVersion" -> "0",
"currentSchemaChangeVersion" -> "1",
"wideningTypeChanges" -> " a: INT -> BIGINT",
"unblockChangeOptions" -> " .option(\"allowSourceColumnTypeChange\", \"1\")",
"unblockStreamOptions" -> " .option(\"allowSourceColumnTypeChange\", \"always\")",
"unblockChangeConfs" ->
" SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;",
"unblockStreamConfs" ->
Expand Down
Loading

0 comments on commit 789afe9

Please sign in to comment.