Skip to content

Commit

Permalink
v1
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Mar 3, 2025
1 parent fd6f7cd commit 06a5c2f
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 19 deletions.
4 changes: 4 additions & 0 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,10 @@ def test_addFeatureSupport(self) -> None:
["appendOnly", "deletionVectors", "invariants"])

def test_dropFeatureSupport(self) -> None:
# The expected results below are based on drop feature with history truncation.
# Fast drop feature, adds a writer feature when dropped. The relevant behavior is tested
# in the DeltaFastDropFeatureSuite.
self.spark.conf.set('spark.databricks.delta.tableFeatures.fastDropFeature.enabled', 'false')
dt = self.__create_df_for_feature_tests()

dt.addFeatureSupport("testRemovableWriter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,13 @@ trait DeltaSQLConfBase {
.createWithDefault(false)

val FAST_DROP_FEATURE_ENABLED =
buildConf("tableFeatures.dev.fastDropFeature.enabled")
buildConf("tableFeatures.fastDropFeature.enabled")
.internal()
.doc(
"""Whether to enable the fast drop feature feature functionality.
|This feature is currently in development and this config is only intended to be enabled
|for testing purposes.""".stripMargin)
"""Whether to allow dropping features with the fast drop feature feature
|functionality.""".stripMargin)
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val FAST_DROP_FEATURE_DV_DISCOVERY_IN_VACUUM_DISABLED =
buildConf("tableFeatures.dev.fastDropFeature.DVDiscoveryInVacuum.disabled")
Expand All @@ -460,7 +459,7 @@ trait DeltaSQLConfBase {
.createWithDefault(false)

val FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES =
buildConf("tableFeatures.dev.fastDropFeature.generateDVTombstones.enabled")
buildConf("tableFeatures.fastDropFeature.generateDVTombstones.enabled")
.internal()
.doc(
"""Whether to generate DV tombstones when dropping deletion vectors.
Expand All @@ -470,7 +469,7 @@ trait DeltaSQLConfBase {
.createWithDefaultFunction(() => SQLConf.get.getConf(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED))

val FAST_DROP_FEATURE_DV_TOMBSTONE_COUNT_THRESHOLD =
buildConf("tableFeatures.dev.fastDropFeature.dvTombstoneCountThreshold")
buildConf("tableFeatures.fastDropFeature.dvTombstoneCountThreshold")
.doc(
"""The maximum number of DV tombstones we are allowed store to memory when dropping
|deletion vectors. When the resulting number of DV tombstones is higher, we use
Expand All @@ -481,7 +480,7 @@ trait DeltaSQLConfBase {
.createWithDefault(10000)

val FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL =
buildConf("tableFeatures.dev.fastDropFeature.alwaysValidateProtocolInStreaming.enabled")
buildConf("tableFeatures.fastDropFeature.alwaysValidateProtocolInStreaming.enabled")
.internal()
.doc(
"""Whether to validate the protocol when starting a stream from arbitrary
Expand Down
9 changes: 8 additions & 1 deletion spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.language.postfixOps
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol }
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand Down Expand Up @@ -208,8 +209,14 @@ class DeltaTableHadoopOptionsSuite extends QueryTest

import testImplicits._

protected override def sparkConf =
protected override def sparkConf = {
super.sparkConf.set("spark.delta.logStore.fake.impl", classOf[LocalLogStore].getName)
// The drop feature test below is targeting the drop feature with history truncation
// implementation. The fast drop feature implementation adds a new writer feature when dropping
// a feature and also does not require any waiting time. The fast drop feature implementation
// is tested extensively in the DeltaFastDropFeatureSuite.
super.sparkConf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
}

/**
* Create Hadoop file system options for `FakeFileSystem`. If Delta doesn't pick up them,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ class DeltaFastDropFeatureSuite
}
}

test("Drop CheckpointProtectionTableFeature") {
for (withFastDropFeatureEnabled <- BOOLEAN_DOMAIN)
test("Drop CheckpointProtectionTableFeature " +
s"withFastDropFeatureEnabled: $withFastDropFeatureEnabled") {
withTempDir { dir =>
val clock = new ManualClock(System.currentTimeMillis())
val deltaLog = DeltaLog.forTable(spark, new Path(dir.getAbsolutePath), clock)
Expand All @@ -334,17 +336,20 @@ class DeltaFastDropFeatureSuite
val checkpointProtectionVersion =
CheckpointProtectionTableFeature.getCheckpointProtectionVersion(deltaLog.update())

val e = intercept[DeltaTableFeatureException] {
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
}
checkError(
e,
"DELTA_FEATURE_DROP_CHECKPOINT_PROTECTION_WAIT_FOR_RETENTION_PERIOD",
parameters = Map("truncateHistoryLogRetentionPeriod" -> "24 hours"))
withSQLConf(
DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key -> withFastDropFeatureEnabled.toString) {
val e = intercept[DeltaTableFeatureException] {
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
}
checkError(
e,
"DELTA_FEATURE_DROP_CHECKPOINT_PROTECTION_WAIT_FOR_RETENTION_PERIOD",
parameters = Map("truncateHistoryLogRetentionPeriod" -> "24 hours"))

clock.advance(TimeUnit.HOURS.toMillis(48))
clock.advance(TimeUnit.HOURS.toMillis(48))

dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
}

val snapshot = deltaLog.update()
val protocol = snapshot.protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
// `.schema` generates NOT NULL columns which requires writer protocol 2. We convert all to
// NULLable to avoid silent writer protocol version bump.
private lazy val testTableSchema = spark.range(1).schema.asNullable
override protected def sparkConf: SparkConf = {
// All the drop feature tests below are targeting the drop feature with history truncation
// implementation. The fast drop feature implementation is tested extensively in
// DeltaFastDropFeatureSuite.
super.sparkConf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
}

// This is solely a test hook. Users cannot create new Delta tables with protocol lower than
// that of their current version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ class DeltaTableFeatureSuite
with DeltaSQLCommandTest {

private lazy val testTableSchema = spark.range(1).schema
override protected def sparkConf: SparkConf = {
// All the drop feature tests below are targeting the drop feature with history truncation
// implementation. The fast drop feature implementation adds a new writer feature when dropping
// a feature and also does not require any waiting time. The fast drop feature implementation
// is tested extensively in the DeltaFastDropFeatureSuite.
super.sparkConf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
}

// This is solely a test hook. Users cannot create new Delta tables with protocol lower than
// that of their current version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaConfigs._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.DeltaSQLConf._

import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -31,6 +32,14 @@ import org.apache.spark.util.ManualClock
*/
class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils {

override def beforeAll(): Unit = {
super.beforeAll()
// All the drop feature tests below are based on the drop feature with history truncation
// implementation. The fast drop feature implementation does not require any waiting time.
// The fast drop feature implementation is tested extensively in the DeltaFastDropFeatureSuite.
spark.conf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, false.toString)
}

val clock = new ManualClock(System.currentTimeMillis())
test("column mapping cannot be dropped without the feature flag") {
withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{RemoveFile, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.util.DeltaFileOperations
import com.google.common.math.DoubleMath
Expand All @@ -46,6 +47,11 @@ trait TypeWideningTestMixin extends DeltaSQLCommandTest with DeltaDMLTestUtils {
.set(SQLConf.ANSI_ENABLED.key, "true")
// Rebase mode must be set explicitly to allow writing dates before 1582-10-15.
.set(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, LegacyBehaviorPolicy.CORRECTED.toString)
// All the drop feature tests below are based on the drop feature with history truncation
// implementation. The fast drop feature implementation does not require any waiting time.
// The fast drop feature implementation is tested extensively in the
// DeltaFastDropFeatureSuite.
.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, false.toString)
}

/** Enable (or disable) type widening for the table under the given path. */
Expand Down

0 comments on commit 06a5c2f

Please sign in to comment.