Skip to content

Commit

Permalink
[Spark] Add protocol validation in Metadata cleanup based on CRCs (#4211
Browse files Browse the repository at this point in the history
)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

`CheckpointProtectionTableFeature` requires clients to only cleanup
metadata if they can clean up to
`requireCheckpointProtectionBeforeVersion`. When this this not possible
the metadata cleanup operation aborts.
`requireCheckpointProtectionBeforeVersion` is updated with the version
of the latest protocol downgrade every time a feature is dropped.
Because of that, in certain scenarios, metadata cleanup could halt for
extended periods of time.

This PR improves this behavior by allowing the client to proceed with
metadata cleanup when the invariant above does not hold, as long as, a
checkpoint already exists at the cleanup cut off version. If none of the
invariants hold, it verifies it supports the protocols of all commits
planning to remove (including any new checkpoint creation version).

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Added tests in `DeltaRetentionSuite`.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No.
  • Loading branch information
andreaschat-db authored Mar 3, 2025
1 parent cee9e37 commit 6dec3d7
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ object DeltaLogFileIndex {
lazy val COMMIT_FILE_FORMAT = new JsonFileFormat
lazy val CHECKPOINT_FILE_FORMAT_PARQUET = new ParquetFileFormat
lazy val CHECKPOINT_FILE_FORMAT_JSON = new JsonFileFormat
lazy val CHECKSUM_FILE_FORMAT = new JsonFileFormat

def apply(format: FileFormat, fs: FileSystem, paths: Seq[Path]): DeltaLogFileIndex = {
DeltaLogFileIndex(format, paths.map(fs.getFileStatus).toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Protocol}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram

Expand Down Expand Up @@ -62,6 +62,9 @@ object DeltaUDF {
def booleanFromString(s: String => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromStringTemplate, s, udf(s))

def booleanFromProtocol(f: Protocol => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromProtocol, f, udf(f))

def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f))

Expand Down Expand Up @@ -95,6 +98,9 @@ object DeltaUDF {
private lazy val booleanFromStringTemplate =
udf((_: String) => false).asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromProtocol =
udf((_: Protocol) => true).asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromMapTemplate =
udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction]

Expand Down
123 changes: 116 additions & 7 deletions spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ package org.apache.spark.sql.delta

import java.util.{Calendar, TimeZone}

import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator
import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, MINUTE, TruncationGranularity}
import org.apache.spark.sql.delta.actions.{Action, Metadata}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.commons.lang3.time.DateUtils
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.internal.MDC
import org.apache.spark.sql.functions.{col, isnull, lit, not, when}

private[delta] object TruncationGranularity extends Enumeration {
type TruncationGranularity = Value
Expand All @@ -41,6 +44,11 @@ private[delta] object TruncationGranularity extends Enumeration {
trait MetadataCleanup extends DeltaLogging {
self: DeltaLog =>

protected type VersionRange = NumericRange.Inclusive[Long]

protected def versionRange(start: Long, end: Long): VersionRange =
NumericRange.inclusive[Long](start = start, end = end, step = 1)

/** Whether to clean up expired log files and checkpoints. */
def enableExpiredLogCleanup(metadata: Metadata): Boolean =
DeltaConfigs.ENABLE_EXPIRED_LOG_CLEANUP.fromMetaData(metadata)
Expand Down Expand Up @@ -163,24 +171,125 @@ trait MetadataCleanup extends DeltaLogging {
files, fileCutOffTime, threshold, getDeltaFileOrCheckpointVersion)
}

protected def checkpointExistsAtCleanupBoundary(deltaLog: DeltaLog, version: Long): Boolean = {
if (spark.conf.get(DeltaSQLConf.ALLOW_METADATA_CLEANUP_CHECKPOINT_EXISTENCE_CHECK_DISABLED)) {
return false
}

val upperBoundVersion = Some(CheckpointInstance(version = version + 1))
deltaLog
.findLastCompleteCheckpointBefore(upperBoundVersion)
.exists(_.version == version)
}

/**
* Validates whether the metadata cleanup adheres to the CheckpointProtectionTableFeature
* requirements. Metadata cleanup is only allowed if we can clean up everything before
* requireCheckpointProtectionBeforeVersion. The implementation below scans the history
* until it finds a commit that satisfies the invariant.
* requirements. Metadata cleanup is only allowed if we can cleanup everything before
* requireCheckpointProtectionBeforeVersion. If this is not possible, we can still cleanup
* if there is already a checkpoint at the cleanup boundary version.
*
* If none of the invariants above are satisfied, we validate whether we support all
* protocols in the commit range we are planning to delete. If we encounter an unsupported
* protocol we skip the cleanup.
*/
private def metadataCleanupAllowed(
snapshot: Snapshot,
fileCutOffTime: Long): Boolean = {
def expandVersionRange(currentRange: VersionRange, versionToCover: Long): VersionRange =
versionRange(currentRange.start.min(versionToCover), currentRange.end.max(versionToCover))

val checkpointProtectionVersion =
CheckpointProtectionTableFeature.getCheckpointProtectionVersion(snapshot)
if (checkpointProtectionVersion <= 0) return true

def versionGreaterOrEqualToThreshold(file: FileStatus): Boolean =
getDeltaFileOrCheckpointVersion(file.getPath) >= checkpointProtectionVersion - 1

val expiredDeltaLogs = listExpiredDeltaLogs(fileCutOffTime)
expiredDeltaLogs.isEmpty || expiredDeltaLogs.exists(versionGreaterOrEqualToThreshold)
if (expiredDeltaLogs.isEmpty) return true

val deltaLog = snapshot.deltaLog
val toCleanVersionRange = expiredDeltaLogs
.filter(isDeltaFile)
.collect { case DeltaFile(_, version) => version }
// Stop early if we cannot cleanup beyond the checkpointProtectionVersion.
// We include equality for the CheckpointProtection invariant check below.
// Assumes commit versions are continuous.
.takeWhile { _ <= checkpointProtectionVersion - 1 }
.foldLeft(versionRange(Long.MaxValue, 0L))(expandVersionRange)

// CheckpointProtectionTableFeature main invariant.
if (toCleanVersionRange.end >= checkpointProtectionVersion - 1) return true
// If we cannot delete until the checkpoint protection version. Check if a checkpoint already
// exists at the cleanup boundary. If it does, it is safe to clean up to the boundary.
if (checkpointExistsAtCleanupBoundary(deltaLog, toCleanVersionRange.end + 1L)) return true

// If the CheckpointProtectionTableFeature invariants do not hold, we must support all
// protocols for commits that we are cleaning up. Also, we have to support the first
// commit that we retain, because we will be creating a new checkpoint for that commit.
allProtocolsSupported(
deltaLog,
versionRange(toCleanVersionRange.start, toCleanVersionRange.end + 1L))
}

/**
* Validates whether the client supports read for all the protocols in the provided checksums
* as well as write for `versionThatRequiresWriteSupport`.
*
* @param deltaLog The log of the delta table.
* @param checksumsToValidate An iterator with the checksum files we need to validate. The client
* needs read support for all the encountered protocols.
* @param versionThatRequiresWriteSupport The version the client needs write support. This
* is the version we are creating a new checkpoint.
* @param expectedChecksumFileCount The expected number of checksum files. If the iterator
* contains less files, the function returns false.
* @return Returns false if there is a non-supported or null protocol in the provided checksums.
* Returns true otherwise.
*/
protected[delta] def allProtocolsSupported(
deltaLog: DeltaLog,
checksumsToValidate: Iterator[FileStatus],
versionThatRequiresWriteSupport: Long,
expectedChecksumFileCount: Long): Boolean = {
if (!spark.conf.get(DeltaSQLConf.ALLOW_METADATA_CLEANUP_WHEN_ALL_PROTOCOLS_SUPPORTED)) {
return false
}

val schemaToUse = Action.logSchema(Set("protocol"))
val supportedForRead = DeltaUDF.booleanFromProtocol(_.supportedForRead())(col("protocol"))
val supportedForWrite = DeltaUDF.booleanFromProtocol(_.supportedForWrite())(col("protocol"))
val supportedForReadAndWrite = supportedForRead && supportedForWrite
val supported =
when(col("version") === lit(versionThatRequiresWriteSupport), supportedForReadAndWrite)
.otherwise(supportedForRead)

val fileIndexOpt =
DeltaLogFileIndex(DeltaLogFileIndex.CHECKSUM_FILE_FORMAT, checksumsToValidate.toSeq)
val fileIndexSupportedOpt = fileIndexOpt.map { index =>
if (index.inputFiles.length != expectedChecksumFileCount) return false

deltaLog
.loadIndex(index, schemaToUse)
// If we find any CRC with no protocol definition we need to abort.
.filter(isnull(col("protocol")) || not(supported))
.take(1)
.isEmpty
}
fileIndexSupportedOpt.getOrElse(true)
}

protected[delta] def allProtocolsSupported(
deltaLog: DeltaLog,
versionRange: VersionRange): Boolean = {
// We only expect back filled commits in the range.
val checksumsToValidate = deltaLog
.listFrom(versionRange.start)
.collect { case ChecksumFile(fileStatus, version) => (fileStatus, version) }
.takeWhile { case (_, version) => version <= versionRange.end }
.map { case (fileStatus, _) => fileStatus }

allProtocolsSupported(
deltaLog,
checksumsToValidate,
versionThatRequiresWriteSupport = versionRange.end,
expectedChecksumFileCount = versionRange.end - versionRange.start + 1)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ case class TestUnsupportedReaderWriterFeaturePreDowngradeCommand(table: DeltaTab
override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = true
}

case class TestUnsupportedWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand {
override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = true
}

case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {
Expand Down
41 changes: 39 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,11 @@ object TableFeature {
TestLegacyWriterFeature,
TestLegacyReaderWriterFeature,
TestWriterFeature,
TestUnsupportedWriterFeature,
TestWriterMetadataNoAutoUpdateFeature,
TestReaderWriterFeature,
TestUnsupportedReaderWriterFeature,
TestUnsupportedNoHistoryProtectionReaderWriterFeature,
TestReaderWriterMetadataAutoUpdateFeature,
TestReaderWriterMetadataNoAutoUpdateFeature,
TestRemovableWriterFeature,
Expand All @@ -400,8 +402,12 @@ object TableFeature {
}

/** Test only features that appear unsupported in order to test protocol validations. */
def testUnsupportedFeatures: Set[TableFeature] =
if (isTesting) Set(TestUnsupportedReaderWriterFeature) else Set.empty
def testUnsupportedFeatures: Set[TableFeature] = {
if (!isTesting) return Set.empty
Set(TestUnsupportedReaderWriterFeature,
TestUnsupportedNoHistoryProtectionReaderWriterFeature,
TestUnsupportedWriterFeature)
}

private val allDependentFeaturesMap: Map[TableFeature, Set[TableFeature]] = {
val dependentFeatureTuples =
Expand Down Expand Up @@ -1158,6 +1164,37 @@ object TestUnsupportedReaderWriterFeature
override def actionUsesFeature(action: Action): Boolean = false
}

/**
* Test feature that appears unsupported and can be dropped without checkpoint protection.
* it is used only for testing purposes.
*/
object TestUnsupportedNoHistoryProtectionReaderWriterFeature
extends ReaderWriterFeature(name = "testUnsupportedNoHistoryProtectionReaderWriter")
with RemovableFeature {

override def validateRemoval(snapshot: Snapshot): Boolean = true

override def requiresHistoryProtection: Boolean = false

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TestUnsupportedReaderWriterFeaturePreDowngradeCommand(table)

override def actionUsesFeature(action: Action): Boolean = false
}

object TestUnsupportedWriterFeature
extends WriterFeature(name = "testUnsupportedWriter")
with RemovableFeature {

/** Make sure the property is not enabled on the table. */
override def validateRemoval(snapshot: Snapshot): Boolean = true

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TestUnsupportedWriterFeaturePreDowngradeCommand(table)

override def actionUsesFeature(action: Action): Boolean = false
}

private[sql] object TestRemovableWriterFeatureWithDependency
extends WriterFeature(name = "testRemovableWriterFeatureWithDependency")
with FeatureAutomaticallyEnabledByMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,29 @@ trait TableFeatureSupport { this: Protocol =>
// new protocol
readerAndWriterFeatureNames.contains(feature.name)
}

/** Returns whether this client supports writing in a table with this protocol. */
def supportedForWrite(): Boolean = {
val supportedWriterVersions = Action.supportedWriterVersionNumbers
val supportedWriterFeatures = Action.supportedProtocolVersion().writerFeatureNames
val testUnsupportedFeatures: Set[String] = TableFeature.testUnsupportedFeatures
.filterNot(_.isReaderWriterFeature)
.map(_.name)

supportedWriterVersions.contains(this.minWriterVersion) &&
this.writerFeatureNames.subsetOf(supportedWriterFeatures -- testUnsupportedFeatures)
}

/** Returns whether this client supports reading a table with this protocol. */
def supportedForRead(): Boolean = {
val supportedReaderVersions = Action.supportedReaderVersionNumbers
val supportedReaderFeatures = Action.supportedProtocolVersion().readerFeatureNames
val testUnsupportedFeatures: Set[String] = TableFeature.testUnsupportedFeatures
.filter(_.isReaderWriterFeature).map(_.name)

supportedReaderVersions.contains(this.minReaderVersion) &&
this.readerFeatureNames.subsetOf(supportedReaderFeatures -- testUnsupportedFeatures)
}
}

object TableFeatureProtocolUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,25 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val ALLOW_METADATA_CLEANUP_WHEN_ALL_PROTOCOLS_SUPPORTED =
buildConf("tableFeatures.allowMetadataCleanupWhenAllProtocolsSupported")
.internal()
.doc(
"""Whether to perform protocol validation when the client is unable to clean
|up to 'delta.requireCheckpointProtectionBeforeVersion'.""".stripMargin)
.booleanConf
.createWithDefault(true)

val ALLOW_METADATA_CLEANUP_CHECKPOINT_EXISTENCE_CHECK_DISABLED =
buildConf("tableFeatures.dev.allowMetadataCleanupCheckpointExistenceCheck.disabled")
.internal()
.doc(
"""Whether to disable the checkpoint check at the cleanup boundary when performing
|the CheckpointProtectionTableFeature validations.
|This is only used for testing purposes.'.""".stripMargin)
.booleanConf
.createWithDefault(false)

val FAST_DROP_FEATURE_ENABLED =
buildConf("tableFeatures.dev.fastDropFeature.enabled")
.internal()
Expand Down
Loading

0 comments on commit 6dec3d7

Please sign in to comment.