From a16a2fd96eecf1a209274bdbc7315ac4768df377 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Wed, 19 Feb 2025 17:52:19 +0100 Subject: [PATCH] [Spark] Add dataframe reader options to unblock non-additive schema changes (#4126) ## Description Non-additive schema changes - DROP/RENAME and, since https://github.com/databricks-eng/runtime/pull/124363 , type changes - in streaming block the stream until the user sets a SQL conf to unblock them: ``` spark.databricks.delta.streaming.allowSourceColumnRename spark.databricks.delta.streaming.allowSourceColumnDrop spark.databricks.delta.streaming.allowSourceColumnTypeChange ``` This change adds dataframe reader options as an alternative to SQL confs to unblock non-additive schema changes: ``` spark.readStream .option("allowSourceColumnRename", "true") .option("allowSourceColumnDrop", "true") .option("allowSourceColumnTypeChange", "true") ``` ## How was this patch tested? Extended existing tests in `DeltaSourceMetadataEvolutionSupportSuite` to also cover dataframe reader options. ## This PR introduces the following *user-facing* changes The error thrown on non-additive schema changes during streaming is updated to suggest dataframe reader options in addition to SQL confs to unblock the stream: ``` [DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_POST_SCHEMA_EVOLUTION] We've detected one or more non-additive schema change(s) (DROP) between Delta version 1 and 2 in the Delta streaming source. Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version 2. Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing. Using dataframe reader option(s): .option("allowSourceColumnDrop", "true") Using SQL configuration(s): To unblock for this particular stream just for this series of schema change(s): SET spark.databricks.delta.streaming.allowSourceColumnDrop.ckpt_123456 = 2; To unblock for this particular stream: SET spark.databricks.delta.streaming.allowSourceColumnDrop.ckpt_123456 = "always"; To unblock for all streams: SET spark.databricks.delta.streaming.allowSourceColumnDrop= "always"; ``` The user can use the available reader option to unblock a given type of non-additive schema change: ``` spark.readStream .option("allowSourceColumnRename", "true") .option("allowSourceColumnDrop", "true") .option("allowSourceColumnTypeChange", "true") ``` --- .../resources/error/delta-error-classes.json | 14 ++++ .../apache/spark/sql/delta/DeltaErrors.scala | 18 +++++ .../apache/spark/sql/delta/DeltaOptions.scala | 10 +++ .../sql/delta/sources/DeltaDataSource.scala | 3 +- .../DeltaSourceMetadataEvolutionSupport.scala | 62 ++++++++++++---- .../DeltaSourceMetadataTrackingLog.scala | 9 ++- .../spark/sql/delta/DeltaErrorsSuite.scala | 10 +++ .../DeltaSourceSchemaEvolutionSuite.scala | 13 ++-- ...aSourceMetadataEvolutionSupportSuite.scala | 74 ++++++++++++++++--- .../TypeWideningStreamingSourceSuite.scala | 44 ++++++++++- 10 files changed, 225 insertions(+), 32 deletions(-) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 041f274740a..3749045beb3 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2319,6 +2319,13 @@ "Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version .", "Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing.", "", + "Using dataframe reader option(s):", + "To unblock for this particular stream just for this series of schema change(s):", + "", + "To unblock for this particular stream:", + "", + "", + "Using SQL configuration(s):", "To unblock for this particular stream just for this series of schema change(s):", "", "To unblock for this particular stream:", @@ -2338,6 +2345,13 @@ "Please check if you want to update your streaming query before we proceed with stream processing using the finalized schema at version .", "Once you have updated your streaming query or have decided there is no need to update it, you can set the following configuration to unblock the type change(s) and continue stream processing.", "", + "Using dataframe reader option:", + "To unblock for this particular stream just for this series of schema change(s):", + "", + "To unblock for this particular stream:", + "", + "", + "Using SQL configuration:", "To unblock for this particular stream just for this series of schema change(s):", "", "To unblock for this particular stream:", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 65f15dc251f..57bdec6c070 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -3136,7 +3136,14 @@ trait DeltaErrorsBase previousSchemaChangeVersion: Long, currentSchemaChangeVersion: Long, checkpointHash: Int, + readerOptionsUnblock: Seq[String], sqlConfsUnblock: Seq[String]): Throwable = { + val unblockChangeOptions = readerOptionsUnblock.map { option => + s""" .option("$option", "$currentSchemaChangeVersion")""" + }.mkString("\n") + val unblockStreamOptions = readerOptionsUnblock.map { option => + s""" .option("$option", "always")""" + }.mkString("\n") val unblockChangeConfs = sqlConfsUnblock.map { conf => s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;""" }.mkString("\n") @@ -3154,6 +3161,8 @@ trait DeltaErrorsBase previousSchemaChangeVersion.toString, currentSchemaChangeVersion.toString, currentSchemaChangeVersion.toString, + unblockChangeOptions, + unblockStreamOptions, unblockChangeConfs, unblockStreamConfs, unblockAllConfs @@ -3165,6 +3174,7 @@ trait DeltaErrorsBase previousSchemaChangeVersion: Long, currentSchemaChangeVersion: Long, checkpointHash: Int, + readerOptionsUnblock: Seq[String], sqlConfsUnblock: Seq[String], wideningTypeChanges: Seq[TypeChange]): Throwable = { @@ -3173,6 +3183,12 @@ trait DeltaErrorsBase s"${change.toType.sql}" }.mkString("\n") + val unblockChangeOptions = readerOptionsUnblock.map { option => + s""" .option("$option", "$currentSchemaChangeVersion")""" + }.mkString("\n") + val unblockStreamOptions = readerOptionsUnblock.map { option => + s""" .option("$option", "always")""" + }.mkString("\n") val unblockChangeConfs = sqlConfsUnblock.map { conf => s""" SET $conf.ckpt_$checkpointHash = $currentSchemaChangeVersion;""" }.mkString("\n") @@ -3190,6 +3206,8 @@ trait DeltaErrorsBase currentSchemaChangeVersion.toString, wideningTypeChangesStr, currentSchemaChangeVersion.toString, + unblockChangeOptions, + unblockStreamOptions, unblockChangeConfs, unblockStreamConfs, unblockAllConfs diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala index dd01fc0ba2f..626e06e4c88 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala @@ -212,6 +212,12 @@ trait DeltaReadOptions extends DeltaOptionParser { val schemaTrackingLocation = options.get(SCHEMA_TRACKING_LOCATION) val sourceTrackingId = options.get(STREAMING_SOURCE_TRACKING_ID) + + val allowSourceColumnRename = options.get(ALLOW_SOURCE_COLUMN_RENAME) + + val allowSourceColumnDrop = options.get(ALLOW_SOURCE_COLUMN_DROP) + + val allowSourceColumnTypeChange = options.get(ALLOW_SOURCE_COLUMN_TYPE_CHANGE) } @@ -289,6 +295,10 @@ object DeltaOptions extends DeltaLogging { */ val STREAMING_SOURCE_TRACKING_ID = "streamingSourceTrackingId" + val ALLOW_SOURCE_COLUMN_DROP = "allowSourceColumnDrop" + val ALLOW_SOURCE_COLUMN_RENAME = "allowSourceColumnRename" + val ALLOW_SOURCE_COLUMN_TYPE_CHANGE = "allowSourceColumnTypeChange" + /** * An option to control if delta will write partition columns to data files */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index f9a1e0d1e2c..dab3b4411b8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -439,7 +439,6 @@ object DeltaDataSource extends DatabricksLogging { parameters: Map[String, String], sourceMetadataPathOpt: Option[String] = None, mergeConsecutiveSchemaChanges: Boolean = false): Option[DeltaSourceMetadataTrackingLog] = { - val options = new CaseInsensitiveStringMap(parameters.asJava) DeltaDataSource.extractSchemaTrackingLocationConfig(spark, parameters) .map { schemaTrackingLocation => @@ -451,7 +450,7 @@ object DeltaDataSource extends DatabricksLogging { DeltaSourceMetadataTrackingLog.create( spark, schemaTrackingLocation, sourceSnapshot, - Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)), + parameters, sourceMetadataPathOpt, mergeConsecutiveSchemaChanges ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala index 9c9687a17ee..9863995e9e5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala @@ -449,7 +449,7 @@ object DeltaSourceMetadataEvolutionSupport { /** * Defining the different combinations of non-additive schema changes to detect them and allow - * users to vet and unblock them using a corresponding SQL conf: + * users to vet and unblock them using a corresponding SQL conf or reader option: * - dropping columns * - renaming columns * - widening data types @@ -460,23 +460,28 @@ object DeltaSourceMetadataEvolutionSupport { val isDrop: Boolean val isTypeWidening: Boolean val sqlConfsUnblock: Seq[String] + val readerOptionsUnblock: Seq[String] } // Single types of schema change, typically caused by a single ALTER TABLE operation. private case object SchemaChangeRename extends SchemaChangeType { override val name = "RENAME COLUMN" - override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME) override val (isRename, isDrop, isTypeWidening) = (true, false, false) + override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME) + override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME) } private case object SchemaChangeDrop extends SchemaChangeType { override val name = "DROP COLUMN" override val (isRename, isDrop, isTypeWidening) = (false, true, false) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_DROP) + override val readerOptionsUnblock: Seq[String] = Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP) } private case object SchemaChangeTypeWidening extends SchemaChangeType { override val name = "TYPE WIDENING" override val (isRename, isDrop, isTypeWidening) = (false, false, true) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_TYPE_CHANGE) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE) } // Combinations of rename, drop and type change -> can be caused by a complete overwrite. @@ -484,24 +489,32 @@ object DeltaSourceMetadataEvolutionSupport { override val name = "RENAME AND DROP COLUMN" override val (isRename, isDrop, isTypeWidening) = (true, true, false) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME_DROP) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP) } private case object SchemaChangeRenameAndTypeWidening extends SchemaChangeType { override val name = "RENAME AND TYPE WIDENING" override val (isRename, isDrop, isTypeWidening) = (true, false, true) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME, SQL_CONF_UNBLOCK_TYPE_CHANGE) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME, DeltaOptions.ALLOW_SOURCE_COLUMN_DROP) } private case object SchemaChangeDropAndTypeWidening extends SchemaChangeType { override val name = "DROP AND TYPE WIDENING" override val (isRename, isDrop, isTypeWidening) = (false, true, true) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE) } private case object SchemaChangeRenameAndDropAndTypeWidening extends SchemaChangeType { override val name = "RENAME, DROP AND TYPE WIDENING" override val (isRename, isDrop, isTypeWidening) = (true, true, true) override val sqlConfsUnblock: Seq[String] = Seq(SQL_CONF_UNBLOCK_RENAME_DROP, SQL_CONF_UNBLOCK_TYPE_CHANGE) + override val readerOptionsUnblock: Seq[String] = + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_DROP, DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE) } private final val allSchemaChangeTypes = Seq( @@ -541,11 +554,12 @@ object DeltaSourceMetadataEvolutionSupport { /** * Returns whether the given type of non-additive schema change was unblocked by setting one of - * the corresponding SQL confs. + * the corresponding SQL confs or reader options. */ private def isChangeUnblocked( spark: SparkSession, change: SchemaChangeType, + options: DeltaOptions, checkpointHash: Int, schemaChangeVersion: Long): Boolean = { @@ -561,11 +575,20 @@ object DeltaSourceMetadataEvolutionSupport { validConfKeysValuePair.exists(p => getConf(p._1).contains(p._2)) } - val isBlockedRename = change.isRename && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) && + def isUnblockedByReaderOption(readerOption: Option[String]): Boolean = { + readerOption.contains("always") || readerOption.contains(schemaChangeVersion.toString) + } + + val isBlockedRename = change.isRename && + !isUnblockedByReaderOption(options.allowSourceColumnRename) && + !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME) && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP) - val isBlockedDrop = change.isDrop && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) && + val isBlockedDrop = change.isDrop && + !isUnblockedByReaderOption(options.allowSourceColumnDrop) && + !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_DROP) && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_RENAME_DROP) val isBlockedTypeChange = change.isTypeWidening && + !isUnblockedByReaderOption(options.allowSourceColumnTypeChange) && !isUnblockedBySQLConf(SQL_CONF_UNBLOCK_TYPE_CHANGE) !isBlockedRename && !isBlockedDrop && !isBlockedTypeChange @@ -576,7 +599,7 @@ object DeltaSourceMetadataEvolutionSupport { /** * Whether to accept widening type changes: * - when true, widening type changes cause the stream to fail, requesting user to review and - * unblock them via a SQL conf. + * unblock them via a SQL conf or reader option. * - when false, widening type changes are rejected without possibility to unblock, similar to * any other arbitrary type change. */ @@ -595,34 +618,45 @@ object DeltaSourceMetadataEvolutionSupport { // scalastyle:off /** * Given a non-additive operation type from a previous schema evolution, check we can process - * using the new schema given any SQL conf users have explicitly set to unblock. + * using the new schema given any SQL conf or dataframe reader option users have explicitly set to + * unblock. * The SQL conf can take one of following formats: * 1. spark.databricks.delta.streaming.allowSourceColumn$action = "always" * -> allows non-additive schema change to propagate for all streams. * 2. spark.databricks.delta.streaming.allowSourceColumn$action.$checkpointHash = "always" * -> allows non-additive schema change to propagate for this particular stream. * 3. spark.databricks.delta.streaming.allowSourceColumn$action.$checkpointHash = $deltaVersion - * -> allow non-additive schema change to propagate only for this particular stream source + * -> allow non-additive schema change to propagate only for this particular stream source + * table version. + * The reader options can take one of the following format: + * 1. .option("allowSourceColumn$action", "always") + * -> allows non-additive schema change to propagate for this particular stream. + * 2. .option("allowSourceColumn$action", "$deltaVersion") + * -> allow non-additive schema change to propagate only for this particular stream source * table version. * where `allowSourceColumn$action` is one of: * 1. `allowSourceColumnRename` to allow column renames. * 2. `allowSourceColumnDrop` to allow column drops. - * 3. `allowSourceColumnRenameAndDrop` to allow both column drops and renames. - * 4. `allowSourceColumnTypeChange` to allow widening type changes. + * 3. `allowSourceColumnTypeChange` to allow widening type changes. + * For SQL confs only, action can also be `allowSourceColumnRenameAndDrop` to allow both column + * drops and renames. * * We will check for any of these configs given the non-additive operation, and throw a proper - * error message to instruct the user to set the SQL conf if they would like to unblock. + * error message to instruct the user to set the SQL conf / reader options if they would like to + * unblock. * * @param metadataPath The path to the source-unique metadata location under checkpoint * @param currentSchema The current persisted schema * @param previousSchema The previous persisted schema */ // scalastyle:on - protected[sources] def validateIfSchemaChangeCanBeUnblockedWithSQLConf( + protected[sources] def validateIfSchemaChangeCanBeUnblocked( spark: SparkSession, + parameters: Map[String, String], metadataPath: String, currentSchema: PersistedMetadata, previousSchema: PersistedMetadata): Unit = { + val options = new DeltaOptions(parameters, spark.sessionState.conf) val checkpointHash = getCheckpointHash(metadataPath) // The start version of a possible series of consecutive schema changes. @@ -644,7 +678,7 @@ object DeltaSourceMetadataEvolutionSupport { determineNonAdditiveSchemaChangeType( spark, currentSchema.dataSchema, previousSchema.dataSchema).foreach { change => if (!isChangeUnblocked( - spark, change, checkpointHash, currentSchemaChangeVersion)) { + spark, change, options, checkpointHash, currentSchemaChangeVersion)) { // Throw error to prompt user to set the correct confs change match { case SchemaChangeTypeWidening => @@ -656,6 +690,7 @@ object DeltaSourceMetadataEvolutionSupport { previousSchemaChangeVersion, currentSchemaChangeVersion, checkpointHash, + change.readerOptionsUnblock, change.sqlConfsUnblock, wideningTypeChanges) @@ -665,6 +700,7 @@ object DeltaSourceMetadataEvolutionSupport { previousSchemaChangeVersion, currentSchemaChangeVersion, checkpointHash, + change.readerOptionsUnblock, change.sqlConfsUnblock) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala index df3e4317424..9abcbcf94d1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.sources // scalastyle:off import.ordering.noEmptyLine import java.io.InputStream +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.spark.sql.delta.streaming.{JsonSchemaSerializer, PartitionAndDataSchema, SchemaTrackingLog} @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap // scalastyle:on import.ordering.noEmptyLine /** @@ -240,10 +242,12 @@ object DeltaSourceMetadataTrackingLog extends Logging { sparkSession: SparkSession, rootMetadataLocation: String, sourceSnapshot: SnapshotDescriptor, - sourceTrackingId: Option[String] = None, + parameters: Map[String, String], sourceMetadataPathOpt: Option[String] = None, mergeConsecutiveSchemaChanges: Boolean = false, initMetadataLogEagerly: Boolean = true): DeltaSourceMetadataTrackingLog = { + val options = new CaseInsensitiveStringMap(parameters.asJava) + val sourceTrackingId = Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)) val metadataTrackingLocation = fullMetadataTrackingLocation( rootMetadataLocation, sourceSnapshot.deltaLog.tableId, sourceTrackingId) val log = new DeltaSourceMetadataTrackingLog( @@ -296,7 +300,8 @@ object DeltaSourceMetadataTrackingLog extends Logging { (log.getPreviousTrackedMetadata, log.getCurrentTrackedMetadata, sourceMetadataPathOpt) match { case (Some(prev), Some(curr), Some(metadataPath)) => DeltaSourceMetadataEvolutionSupport - .validateIfSchemaChangeCanBeUnblockedWithSQLConf(sparkSession, metadataPath, curr, prev) + .validateIfSchemaChangeCanBeUnblocked( + sparkSession, parameters, metadataPath, curr, prev) case _ => } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index c2ac1addfa0..2f737c68f04 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -2732,6 +2732,7 @@ trait DeltaErrorsSuiteBase nonAdditiveSchemaChangeOpType = "RENAME AND TYPE WIDENING", previousSchemaChangeVersion = 0, currentSchemaChangeVersion = 1, + readerOptionsUnblock = Seq("allowSourceColumnRename", "allowSourceColumnTypeChange"), sqlConfsUnblock = Seq( "spark.databricks.delta.streaming.allowSourceColumnRename", "spark.databricks.delta.streaming.allowSourceColumnTypeChange"), @@ -2743,6 +2744,12 @@ trait DeltaErrorsSuiteBase "opType" -> "RENAME AND TYPE WIDENING", "previousSchemaChangeVersion" -> "0", "currentSchemaChangeVersion" -> "1", + "unblockChangeOptions" -> + s""" .option("allowSourceColumnRename", "1") + | .option("allowSourceColumnTypeChange", "1")""".stripMargin, + "unblockStreamOptions" -> + s""" .option("allowSourceColumnRename", "always") + | .option("allowSourceColumnTypeChange", "always")""".stripMargin, "unblockChangeConfs" -> s""" SET spark.databricks.delta.streaming.allowSourceColumnRename.ckpt_15 = 1; | SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;""".stripMargin, @@ -2760,6 +2767,7 @@ trait DeltaErrorsSuiteBase throw DeltaErrors.cannotContinueStreamingTypeWidening( previousSchemaChangeVersion = 0, currentSchemaChangeVersion = 1, + readerOptionsUnblock = Seq("allowSourceColumnTypeChange"), sqlConfsUnblock = Seq("spark.databricks.delta.streaming.allowSourceColumnTypeChange"), checkpointHash = 15, wideningTypeChanges = Seq(TypeChange(None, IntegerType, LongType, Seq("a")))) @@ -2770,6 +2778,8 @@ trait DeltaErrorsSuiteBase "previousSchemaChangeVersion" -> "0", "currentSchemaChangeVersion" -> "1", "wideningTypeChanges" -> " a: INT -> BIGINT", + "unblockChangeOptions" -> " .option(\"allowSourceColumnTypeChange\", \"1\")", + "unblockStreamOptions" -> " .option(\"allowSourceColumnTypeChange\", \"always\")", "unblockChangeConfs" -> " SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_15 = 1;", "unblockStreamConfs" -> diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala index d697052c49c..bf23b924561 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala @@ -222,7 +222,8 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils initializeEagerly: Boolean = true )(implicit log: DeltaLog): DeltaSourceMetadataTrackingLog = DeltaSourceMetadataTrackingLog.create( - spark, getDefaultSchemaLocation.toString, log.update(), sourceTrackingId, + spark, getDefaultSchemaLocation.toString, log.update(), + parameters = sourceTrackingId.map(DeltaOptions.STREAMING_SOURCE_TRACKING_ID -> _).toMap, initMetadataLogEagerly = initializeEagerly) protected def getDefaultCheckpoint(implicit log: DeltaLog): Path = @@ -534,8 +535,10 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils // of the case; True concurrent execution would require commit service to protected against. val schemaLocation = getDefaultSchemaLocation.toString val snapshot = log.update() - val schemaLog1 = DeltaSourceMetadataTrackingLog.create(spark, schemaLocation, snapshot) - val schemaLog2 = DeltaSourceMetadataTrackingLog.create(spark, schemaLocation, snapshot) + val schemaLog1 = DeltaSourceMetadataTrackingLog.create( + spark, schemaLocation, snapshot, parameters = Map.empty) + val schemaLog2 = DeltaSourceMetadataTrackingLog.create( + spark, schemaLocation, snapshot, Map.empty) val newSchema = PersistedMetadata("1", 1, makeMetadata(new StructType(), partitionSchema = new StructType()), @@ -1607,9 +1610,9 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils // Both schema log initialized def schemaLog1: DeltaSourceMetadataTrackingLog = DeltaSourceMetadataTrackingLog.create( - spark, schemaLog1Location, log.update()) + spark, schemaLog1Location, log.update(), parameters = Map.empty) def schemaLog2: DeltaSourceMetadataTrackingLog = DeltaSourceMetadataTrackingLog.create( - spark, schemaLog2Location, log.update()) + spark, schemaLog2Location, log.update(), parameters = Map.empty) // The schema log initializes @ v5 with schema testStream(df)( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala index 4669b1d2666..13bcde4b303 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.delta.sources -import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaTestUtilsBase, DeltaThrowable} +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaOptions, DeltaTestUtilsBase, DeltaThrowable} import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.sql.test.SharedSparkSession @@ -87,7 +87,7 @@ class DeltaSourceMetadataEvolutionSupportSuite } /** - * Unit test runner covering `validateIfSchemaChangeCanBeUnblockedWithSQLConf()`. Takes as input + * Unit test runner covering `validateIfSchemaChangeCanBeUnblocked()`. Takes as input * an initial schema (from) and an updated schema (to) and checks that: * 1. Non-additive schema changes are correctly detected: matches `expectedResult` * 2. Setting SQL confs to unblock the changes allows the check to succeeds. @@ -117,28 +117,41 @@ class DeltaSourceMetadataEvolutionSupportSuite unblock: Seq[Seq[String]] = Seq.empty, confs: Seq[(String, String)] = Seq.empty): Unit = test(s"$name") { - def validate(): Unit = - DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblockedWithSQLConf( + def validate(parameters: Map[String, String]): Unit = + DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblocked( spark, - metadataPath = "sourceMetadataPath", + parameters, + metadataPath = "sourceMetadataPath", currentSchema = persistedMetadata(toDDL, toPhysicalNames), previousSchema = persistedMetadata(fromDDL, fromPhysicalNames) ) withSQLConf(confs: _*) { expectedResult match { - case ExpectedResult.Success(_) => validate() + case ExpectedResult.Success(_) => validate(parameters = Map.empty) case ExpectedResult.Failure(checkError) => + // Run first without setting any configuration to unblock and check that the validation + // fails => column dropped, renamed or with changed type. val ex = intercept[DeltaThrowable] { - validate() + validate(parameters = Map.empty) } checkError(ex) // Verify that we can unblock using SQL confs for (u <- unblock) { withSQLConfUnblockedChanges(u) { - validate() + validate(parameters = Map.empty) } } + // Verify that we can unblock using dataframe reader options. + for (u <- unblock) { + val parameters = u.flatMap { + case "allowSourceColumnRenameAndDrop" => + Seq(DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME -> "always", + DeltaOptions.ALLOW_SOURCE_COLUMN_DROP -> "always") + case option => Seq(option -> "always") + } + validate(parameters.toMap) + } } } } @@ -598,12 +611,55 @@ class DeltaSourceMetadataEvolutionSupportSuite test("combining individual SQL confs to unblock is supported") { withSQLConfUnblockedChanges(Seq("allowSourceColumnRename", "allowSourceColumnDrop")) { - DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblockedWithSQLConf( + DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblocked( + spark, + parameters = Map.empty, + metadataPath = "sourceMetadataPath", + currentSchema = persistedMetadata("a int", Map(Seq("a") -> "b")), + previousSchema = persistedMetadata("a int, b int", Map.empty) + ) + } + } + + test("combining SQL confs and reader options to unblock is supported") { + withSQLConfUnblockedChanges(Seq("allowSourceColumnRename")) { + DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblocked( spark, + parameters = Map("allowSourceColumnDrop" -> "always"), metadataPath = "sourceMetadataPath", currentSchema = persistedMetadata("a int", Map(Seq("a") -> "b")), previousSchema = persistedMetadata("a int, b int", Map.empty) ) } } + + test("unblocking column drop for specific version with reader option is supported") { + DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblocked( + spark, + parameters = Map("allowSourceColumnDrop" -> "0"), + metadataPath = "sourceMetadataPath", + currentSchema = persistedMetadata("a int", Map.empty), + previousSchema = persistedMetadata("a int, b int", Map.empty) + ) + } + + test("unblocking column rename for specific version with reader option is supported") { + DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblocked( + spark, + parameters = Map("allowSourceColumnRename" -> "0"), + metadataPath = "sourceMetadataPath", + currentSchema = persistedMetadata("b int", Map(Seq("b") -> "a")), + previousSchema = persistedMetadata("a int", Map.empty) + ) + } + + test("unblocking column type change for specific version with reader option is supported") { + DeltaSourceMetadataEvolutionSupport.validateIfSchemaChangeCanBeUnblocked( + spark, + parameters = Map("allowSourceColumnTypeChange" -> "0"), + metadataPath = "sourceMetadataPath", + currentSchema = persistedMetadata("a int", Map.empty), + previousSchema = persistedMetadata("a byte", Map.empty) + ) + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSourceSuite.scala index ec93c5c1d43..3db0862dd5a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSourceSuite.scala @@ -396,7 +396,7 @@ trait TypeWideningStreamingSourceTests ("unblock stream", (hash: Int) => s"allowSourceColumnTypeChange.ckpt_$hash", "always"), ("unblock version", (hash: Int) => s"allowSourceColumnTypeChange.ckpt_$hash", "2") )) { - test(s"unblocking stream after type change - $name") { + test(s"unblocking stream with sql conf after type change - $name") { withTempDir { dir => sql(s"CREATE TABLE delta.`$dir` (widened byte, other byte) USING DELTA") // Getting the checkpoint dir through the delta log to ensure the format is consistent with @@ -436,6 +436,44 @@ trait TypeWideningStreamingSourceTests } } + for ((name, optionValue) <- Seq( + ("unblock stream", "always"), + ("unblock version", "2") + )) { + test(s"unblocking stream with reader option after type change - $name") { + withTempDir { dir => + sql(s"CREATE TABLE delta.`$dir` (widened byte, other byte) USING DELTA") + val checkpointDir = new File(dir, "sink_checkpoint") + + def readWithAgg(options: Map[String, String] = Map.empty): DataFrame = + readStream(dir, checkpointDir, options) + .groupBy("other") + .agg(count(col("widened"))) + + testStream(readWithAgg(), outputMode = OutputMode.Complete())( + StartStream(checkpointLocation = checkpointDir.toString), + Execute { _ => sql(s"INSERT INTO delta.`$dir` VALUES (1, 1)") }, + Execute { _ => sql(s"ALTER TABLE delta.`$dir`ALTER COLUMN widened TYPE int") }, + ExpectMetadataEvolutionException() + ) + + testStream(readWithAgg(), outputMode = OutputMode.Complete())( + StartStream(checkpointLocation = checkpointDir.toString), + ExpectTypeChangeBlockedException() + ) + + testStream( + readWithAgg(Map("allowSourceColumnTypeChange" -> optionValue)), + outputMode = OutputMode.Complete())( + StartStream(checkpointLocation = checkpointDir.toString), + Execute { _ => sql(s"INSERT INTO delta.`$dir` VALUES (123456789, 1)") }, + ProcessAllAvailable(), + CheckLastBatch(Row(1, 2)) + ) + } + } + } + test(s"overwrite schema with type change and dropped column") { withTempDir { dir => sql(s"CREATE TABLE delta.`$dir` (a byte, b int) USING DELTA") @@ -470,6 +508,10 @@ trait TypeWideningStreamingSourceTests "opType" -> "DROP AND TYPE WIDENING", "previousSchemaChangeVersion" -> "0", "currentSchemaChangeVersion" -> "2", + "unblockChangeOptions" -> + ".*allowSourceColumnDrop(.|\\n)*allowSourceColumnTypeChange.*", + "unblockStreamOptions" -> + ".*allowSourceColumnDrop(.|\\n)*allowSourceColumnTypeChange.*", "unblockChangeConfs" -> ".*allowSourceColumnDrop(.|\\n)*allowSourceColumnTypeChange.*", "unblockStreamConfs" ->