-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Add dataframe reader options to unblock non-additive schema changes #4126
Merged
scottsand-db
merged 4 commits into
delta-io:master
from
johanl-db:streaming-non-additive-reader-option
Feb 19, 2025
Merged
[Spark] Add dataframe reader options to unblock non-additive schema changes #4126
scottsand-db
merged 4 commits into
delta-io:master
from
johanl-db:streaming-non-additive-reader-option
Feb 19, 2025
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ed23bd6
to
5bdabda
Compare
tomvanbussel
approved these changes
Feb 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small comment, LGTM otherwise.
@@ -451,7 +451,7 @@ object DeltaDataSource extends DatabricksLogging { | |||
|
|||
DeltaSourceMetadataTrackingLog.create( | |||
spark, schemaTrackingLocation, sourceSnapshot, | |||
Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID)), | |||
parameters, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the val options
created above now, as it's no longer used.
anoopj
pushed a commit
to anoopj/delta
that referenced
this pull request
Feb 24, 2025
…hanges (delta-io#4126) ## Description Non-additive schema changes - DROP/RENAME and, since https://github.com/databricks-eng/runtime/pull/124363 , type changes - in streaming block the stream until the user sets a SQL conf to unblock them: ``` spark.databricks.delta.streaming.allowSourceColumnRename spark.databricks.delta.streaming.allowSourceColumnDrop spark.databricks.delta.streaming.allowSourceColumnTypeChange ``` This change adds dataframe reader options as an alternative to SQL confs to unblock non-additive schema changes: ``` spark.readStream .option("allowSourceColumnRename", "true") .option("allowSourceColumnDrop", "true") .option("allowSourceColumnTypeChange", "true") ``` ## How was this patch tested? Extended existing tests in `DeltaSourceMetadataEvolutionSupportSuite` to also cover dataframe reader options. ## This PR introduces the following *user-facing* changes The error thrown on non-additive schema changes during streaming is updated to suggest dataframe reader options in addition to SQL confs to unblock the stream: ``` [DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_POST_SCHEMA_EVOLUTION] We've detected one or more non-additive schema change(s) (DROP) between Delta version 1 and 2 in the Delta streaming source. Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version 2. Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set the following configuration(s) to unblock the non-additive schema change(s) and continue stream processing. <NEW> Using dataframe reader option(s): .option("allowSourceColumnDrop", "true") <NEW> Using SQL configuration(s): To unblock for this particular stream just for this series of schema change(s): SET spark.databricks.delta.streaming.allowSourceColumnDrop.ckpt_123456 = 2; To unblock for this particular stream: SET spark.databricks.delta.streaming.allowSourceColumnDrop.ckpt_123456 = "always"; To unblock for all streams: SET spark.databricks.delta.streaming.allowSourceColumnDrop= "always"; ``` The user can use the available reader option to unblock a given type of non-additive schema change: ``` spark.readStream .option("allowSourceColumnRename", "true") .option("allowSourceColumnDrop", "true") .option("allowSourceColumnTypeChange", "true") ```
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Non-additive schema changes - DROP/RENAME and, since https://github.com/databricks-eng/runtime/pull/124363 , type changes - in streaming block the stream until the user sets a SQL conf to unblock them:
This change adds dataframe reader options as an alternative to SQL confs to unblock non-additive schema changes:
How was this patch tested?
Extended existing tests in
DeltaSourceMetadataEvolutionSupportSuite
to also cover dataframe reader options.This PR introduces the following user-facing changes
The error thrown on non-additive schema changes during streaming is updated to suggest dataframe reader options in addition to SQL confs to unblock the stream:
The user can use the available reader option to unblock a given type of non-additive schema change: