Skip to content

Commit

Permalink
[Spark] Enable Fast Drop feature by default (#4212)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Enable Fast Drop feature by default.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Existing tests.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Yes. The current drop feature command requires the execution of the
command twice with a 24 hour waiting time in between. In addition, it
also results in the truncation of the history of the Delta table to the
last 24 hours.

We introduce a new DROP FEATURE implementation that allows to drop
features instantly without truncating history. Dropping a feature
introduces a new writer feature to the table, the
`CheckpointProtectionTableFeature`.

Dropping a feature with the new behaviour can be achieved as follows:
`ALTER TABLE x DROP FEATURE y`

We can still drop a feature with the old behaviour as follows:
`ALTER TABLE x DROP FEATURE y TRUNCATE HISTORY`.

Finally, the CheckpointProtectionTableFeature can be dropped similarly
to any other feature:
`ALTER TABLE x DROP FEATURE CheckpointProtectionTableFeatureTRUNCATE
HISTORY`.
  • Loading branch information
andreaschat-db authored Mar 4, 2025
1 parent 59e3ccf commit 86460aa
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 20 deletions.
4 changes: 4 additions & 0 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,10 @@ def test_addFeatureSupport(self) -> None:
["appendOnly", "deletionVectors", "invariants"])

def test_dropFeatureSupport(self) -> None:
# The expected results below are based on drop feature with history truncation.
# Fast drop feature, adds a writer feature when dropped. The relevant behavior is tested
# in the DeltaFastDropFeatureSuite.
self.spark.conf.set('spark.databricks.delta.tableFeatures.fastDropFeature.enabled', 'false')
dt = self.__create_df_for_feature_tests()

dt.addFeatureSupport("testRemovableWriter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,14 +460,13 @@ trait DeltaSQLConfBase {
.createWithDefault(false)

val FAST_DROP_FEATURE_ENABLED =
buildConf("tableFeatures.dev.fastDropFeature.enabled")
buildConf("tableFeatures.fastDropFeature.enabled")
.internal()
.doc(
"""Whether to enable the fast drop feature feature functionality.
|This feature is currently in development and this config is only intended to be enabled
|for testing purposes.""".stripMargin)
"""Whether to allow dropping features with the fast drop feature feature
|functionality.""".stripMargin)
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val FAST_DROP_FEATURE_DV_DISCOVERY_IN_VACUUM_DISABLED =
buildConf("tableFeatures.dev.fastDropFeature.DVDiscoveryInVacuum.disabled")
Expand All @@ -479,7 +478,7 @@ trait DeltaSQLConfBase {
.createWithDefault(false)

val FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES =
buildConf("tableFeatures.dev.fastDropFeature.generateDVTombstones.enabled")
buildConf("tableFeatures.fastDropFeature.generateDVTombstones.enabled")
.internal()
.doc(
"""Whether to generate DV tombstones when dropping deletion vectors.
Expand All @@ -489,7 +488,7 @@ trait DeltaSQLConfBase {
.createWithDefaultFunction(() => SQLConf.get.getConf(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED))

val FAST_DROP_FEATURE_DV_TOMBSTONE_COUNT_THRESHOLD =
buildConf("tableFeatures.dev.fastDropFeature.dvTombstoneCountThreshold")
buildConf("tableFeatures.fastDropFeature.dvTombstoneCountThreshold")
.doc(
"""The maximum number of DV tombstones we are allowed store to memory when dropping
|deletion vectors. When the resulting number of DV tombstones is higher, we use
Expand All @@ -500,7 +499,7 @@ trait DeltaSQLConfBase {
.createWithDefault(10000)

val FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL =
buildConf("tableFeatures.dev.fastDropFeature.alwaysValidateProtocolInStreaming.enabled")
buildConf("tableFeatures.fastDropFeature.alwaysValidateProtocolInStreaming.enabled")
.internal()
.doc(
"""Whether to validate the protocol when starting a stream from arbitrary
Expand Down
12 changes: 10 additions & 2 deletions spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.language.postfixOps
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol }
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand Down Expand Up @@ -208,8 +209,15 @@ class DeltaTableHadoopOptionsSuite extends QueryTest

import testImplicits._

protected override def sparkConf =
super.sparkConf.set("spark.delta.logStore.fake.impl", classOf[LocalLogStore].getName)
protected override def sparkConf = {
// The drop feature test below is targeting the drop feature with history truncation
// implementation. The fast drop feature implementation adds a new writer feature when dropping
// a feature and also does not require any waiting time. The fast drop feature implementation
// is tested extensively in the DeltaFastDropFeatureSuite.
super.sparkConf
.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
.set("spark.delta.logStore.fake.impl", classOf[LocalLogStore].getName)
}

/**
* Create Hadoop file system options for `FakeFileSystem`. If Delta doesn't pick up them,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ class DeltaFastDropFeatureSuite
}
}

test("Drop CheckpointProtectionTableFeature") {
for (withFastDropFeatureEnabled <- BOOLEAN_DOMAIN)
test("Drop CheckpointProtectionTableFeature " +
s"withFastDropFeatureEnabled: $withFastDropFeatureEnabled") {
withTempDir { dir =>
val clock = new ManualClock(System.currentTimeMillis())
val deltaLog = DeltaLog.forTable(spark, new Path(dir.getAbsolutePath), clock)
Expand All @@ -334,17 +336,20 @@ class DeltaFastDropFeatureSuite
val checkpointProtectionVersion =
CheckpointProtectionTableFeature.getCheckpointProtectionVersion(deltaLog.update())

val e = intercept[DeltaTableFeatureException] {
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
}
checkError(
e,
"DELTA_FEATURE_DROP_CHECKPOINT_PROTECTION_WAIT_FOR_RETENTION_PERIOD",
parameters = Map("truncateHistoryLogRetentionPeriod" -> "24 hours"))
withSQLConf(
DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key -> withFastDropFeatureEnabled.toString) {
val e = intercept[DeltaTableFeatureException] {
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
}
checkError(
e,
"DELTA_FEATURE_DROP_CHECKPOINT_PROTECTION_WAIT_FOR_RETENTION_PERIOD",
parameters = Map("truncateHistoryLogRetentionPeriod" -> "24 hours"))

clock.advance(TimeUnit.HOURS.toMillis(48))
clock.advance(TimeUnit.HOURS.toMillis(48))

dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
}

val snapshot = deltaLog.update()
val protocol = snapshot.protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
// `.schema` generates NOT NULL columns which requires writer protocol 2. We convert all to
// NULLable to avoid silent writer protocol version bump.
private lazy val testTableSchema = spark.range(1).schema.asNullable
override protected def sparkConf: SparkConf = {
// All the drop feature tests below are targeting the drop feature with history truncation
// implementation. The fast drop feature implementation is tested extensively in
// DeltaFastDropFeatureSuite.
super.sparkConf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
}

// This is solely a test hook. Users cannot create new Delta tables with protocol lower than
// that of their current version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ class DeltaTableFeatureSuite
with DeltaSQLCommandTest {

private lazy val testTableSchema = spark.range(1).schema
override protected def sparkConf: SparkConf = {
// All the drop feature tests below are targeting the drop feature with history truncation
// implementation. The fast drop feature implementation adds a new writer feature when dropping
// a feature and also does not require any waiting time. The fast drop feature implementation
// is tested extensively in the DeltaFastDropFeatureSuite.
super.sparkConf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
}

// This is solely a test hook. Users cannot create new Delta tables with protocol lower than
// that of their current version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaConfigs._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.DeltaSQLConf._

import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -31,6 +32,14 @@ import org.apache.spark.util.ManualClock
*/
class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils {

override def beforeAll(): Unit = {
super.beforeAll()
// All the drop feature tests below are based on the drop feature with history truncation
// implementation. The fast drop feature implementation does not require any waiting time.
// The fast drop feature implementation is tested extensively in the DeltaFastDropFeatureSuite.
spark.conf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, false.toString)
}

val clock = new ManualClock(System.currentTimeMillis())
test("column mapping cannot be dropped without the feature flag") {
withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{RemoveFile, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.util.DeltaFileOperations
import com.google.common.math.DoubleMath
Expand All @@ -46,6 +47,11 @@ trait TypeWideningTestMixin extends DeltaSQLCommandTest with DeltaDMLTestUtils {
.set(SQLConf.ANSI_ENABLED.key, "true")
// Rebase mode must be set explicitly to allow writing dates before 1582-10-15.
.set(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, LegacyBehaviorPolicy.CORRECTED.toString)
// All the drop feature tests below are based on the drop feature with history truncation
// implementation. The fast drop feature implementation does not require any waiting time.
// The fast drop feature implementation is tested extensively in the
// DeltaFastDropFeatureSuite.
.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, false.toString)
}

/** Enable (or disable) type widening for the table under the given path. */
Expand Down

0 comments on commit 86460aa

Please sign in to comment.