Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Add protocol validation in Metadata cleanup based on CRCs #4211

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ object DeltaLogFileIndex {
lazy val COMMIT_FILE_FORMAT = new JsonFileFormat
lazy val CHECKPOINT_FILE_FORMAT_PARQUET = new ParquetFileFormat
lazy val CHECKPOINT_FILE_FORMAT_JSON = new JsonFileFormat
lazy val CHECKSUM_FILE_FORMAT = new JsonFileFormat

def apply(format: FileFormat, fs: FileSystem, paths: Seq[Path]): DeltaLogFileIndex = {
DeltaLogFileIndex(format, paths.map(fs.getFileStatus).toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Protocol}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram

Expand Down Expand Up @@ -62,6 +62,9 @@ object DeltaUDF {
def booleanFromString(s: String => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromStringTemplate, s, udf(s))

def booleanFromProtocol(f: Protocol => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromProtocol, f, udf(f))

def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f))

Expand Down Expand Up @@ -95,6 +98,9 @@ object DeltaUDF {
private lazy val booleanFromStringTemplate =
udf((_: String) => false).asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromProtocol =
udf((_: Protocol) => true).asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromMapTemplate =
udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction]

Expand Down
123 changes: 116 additions & 7 deletions spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ package org.apache.spark.sql.delta

import java.util.{Calendar, TimeZone}

import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator
import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, MINUTE, TruncationGranularity}
import org.apache.spark.sql.delta.actions.{Action, Metadata}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.commons.lang3.time.DateUtils
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.internal.MDC
import org.apache.spark.sql.functions.{col, isnull, lit, not, when}

private[delta] object TruncationGranularity extends Enumeration {
type TruncationGranularity = Value
Expand All @@ -41,6 +44,11 @@ private[delta] object TruncationGranularity extends Enumeration {
trait MetadataCleanup extends DeltaLogging {
self: DeltaLog =>

protected type VersionRange = NumericRange.Inclusive[Long]

protected def versionRange(start: Long, end: Long): VersionRange =
NumericRange.inclusive[Long](start = start, end = end, step = 1)

/** Whether to clean up expired log files and checkpoints. */
def enableExpiredLogCleanup(metadata: Metadata): Boolean =
DeltaConfigs.ENABLE_EXPIRED_LOG_CLEANUP.fromMetaData(metadata)
Expand Down Expand Up @@ -163,24 +171,125 @@ trait MetadataCleanup extends DeltaLogging {
files, fileCutOffTime, threshold, getDeltaFileOrCheckpointVersion)
}

protected def checkpointExistsAtCleanupBoundary(deltaLog: DeltaLog, version: Long): Boolean = {
if (spark.conf.get(DeltaSQLConf.ALLOW_METADATA_CLEANUP_CHECKPOINT_EXISTENCE_CHECK_DISABLED)) {
return false
}

val upperBoundVersion = Some(CheckpointInstance(version = version + 1))
deltaLog
.findLastCompleteCheckpointBefore(upperBoundVersion)
.exists(_.version == version)
}

/**
* Validates whether the metadata cleanup adheres to the CheckpointProtectionTableFeature
* requirements. Metadata cleanup is only allowed if we can clean up everything before
* requireCheckpointProtectionBeforeVersion. The implementation below scans the history
* until it finds a commit that satisfies the invariant.
* requirements. Metadata cleanup is only allowed if we can cleanup everything before
* requireCheckpointProtectionBeforeVersion. If this is not possible, we can still cleanup
* if there is already a checkpoint at the cleanup boundary version.
*
* If none of the invariants above are satisfied, we validate whether we support all
* protocols in the commit range we are planning to delete. If we encounter an unsupported
* protocol we skip the cleanup.
*/
private def metadataCleanupAllowed(
snapshot: Snapshot,
fileCutOffTime: Long): Boolean = {
def expandVersionRange(currentRange: VersionRange, versionToCover: Long): VersionRange =
versionRange(currentRange.start.min(versionToCover), currentRange.end.max(versionToCover))

val checkpointProtectionVersion =
CheckpointProtectionTableFeature.getCheckpointProtectionVersion(snapshot)
if (checkpointProtectionVersion <= 0) return true

def versionGreaterOrEqualToThreshold(file: FileStatus): Boolean =
getDeltaFileOrCheckpointVersion(file.getPath) >= checkpointProtectionVersion - 1

val expiredDeltaLogs = listExpiredDeltaLogs(fileCutOffTime)
expiredDeltaLogs.isEmpty || expiredDeltaLogs.exists(versionGreaterOrEqualToThreshold)
if (expiredDeltaLogs.isEmpty) return true

val deltaLog = snapshot.deltaLog
val toCleanVersionRange = expiredDeltaLogs
.filter(isDeltaFile)
.collect { case DeltaFile(_, version) => version }
// Stop early if we cannot cleanup beyond the checkpointProtectionVersion.
// We include equality for the CheckpointProtection invariant check below.
// Assumes commit versions are continuous.
.takeWhile { _ <= checkpointProtectionVersion - 1 }
.foldLeft(versionRange(Long.MaxValue, 0L))(expandVersionRange)

// CheckpointProtectionTableFeature main invariant.
if (toCleanVersionRange.end >= checkpointProtectionVersion - 1) return true
// If we cannot delete until the checkpoint protection version. Check if a checkpoint already
// exists at the cleanup boundary. If it does, it is safe to clean up to the boundary.
if (checkpointExistsAtCleanupBoundary(deltaLog, toCleanVersionRange.end + 1L)) return true

// If the CheckpointProtectionTableFeature invariants do not hold, we must support all
// protocols for commits that we are cleaning up. Also, we have to support the first
// commit that we retain, because we will be creating a new checkpoint for that commit.
allProtocolsSupported(
deltaLog,
versionRange(toCleanVersionRange.start, toCleanVersionRange.end + 1L))
}

/**
* Validates whether the client supports read for all the protocols in the provided checksums
* as well as write for `versionThatRequiresWriteSupport`.
*
* @param deltaLog The log of the delta table.
* @param checksumsToValidate An iterator with the checksum files we need to validate. The client
* needs read support for all the encountered protocols.
* @param versionThatRequiresWriteSupport The version the client needs write support. This
* is the version we are creating a new checkpoint.
* @param expectedChecksumFileCount The expected number of checksum files. If the iterator
* contains less files, the function returns false.
* @return Returns false if there is a non-supported or null protocol in the provided checksums.
* Returns true otherwise.
*/
protected[delta] def allProtocolsSupported(
deltaLog: DeltaLog,
checksumsToValidate: Iterator[FileStatus],
versionThatRequiresWriteSupport: Long,
expectedChecksumFileCount: Long): Boolean = {
if (!spark.conf.get(DeltaSQLConf.ALLOW_METADATA_CLEANUP_WHEN_ALL_PROTOCOLS_SUPPORTED)) {
return false
}

val schemaToUse = Action.logSchema(Set("protocol"))
val supportedForRead = DeltaUDF.booleanFromProtocol(_.supportedForRead())(col("protocol"))
val supportedForWrite = DeltaUDF.booleanFromProtocol(_.supportedForWrite())(col("protocol"))
val supportedForReadAndWrite = supportedForRead && supportedForWrite
val supported =
when(col("version") === lit(versionThatRequiresWriteSupport), supportedForReadAndWrite)
.otherwise(supportedForRead)

val fileIndexOpt =
DeltaLogFileIndex(DeltaLogFileIndex.CHECKSUM_FILE_FORMAT, checksumsToValidate.toSeq)
val fileIndexSupportedOpt = fileIndexOpt.map { index =>
if (index.inputFiles.length != expectedChecksumFileCount) return false

deltaLog
.loadIndex(index, schemaToUse)
// If we find any CRC with no protocol definition we need to abort.
.filter(isnull(col("protocol")) || not(supported))
.take(1)
.isEmpty
}
fileIndexSupportedOpt.getOrElse(true)
}

protected[delta] def allProtocolsSupported(
deltaLog: DeltaLog,
versionRange: VersionRange): Boolean = {
// We only expect back filled commits in the range.
val checksumsToValidate = deltaLog
.listFrom(versionRange.start)
.collect { case ChecksumFile(fileStatus, version) => (fileStatus, version) }
.takeWhile { case (_, version) => version <= versionRange.end }
.map { case (fileStatus, _) => fileStatus }

allProtocolsSupported(
deltaLog,
checksumsToValidate,
versionThatRequiresWriteSupport = versionRange.end,
expectedChecksumFileCount = versionRange.end - versionRange.start + 1)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ case class TestUnsupportedReaderWriterFeaturePreDowngradeCommand(table: DeltaTab
override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = true
}

case class TestUnsupportedWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand {
override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = true
}

case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {
Expand Down
41 changes: 39 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,11 @@ object TableFeature {
TestLegacyWriterFeature,
TestLegacyReaderWriterFeature,
TestWriterFeature,
TestUnsupportedWriterFeature,
TestWriterMetadataNoAutoUpdateFeature,
TestReaderWriterFeature,
TestUnsupportedReaderWriterFeature,
TestUnsupportedNoHistoryProtectionReaderWriterFeature,
TestReaderWriterMetadataAutoUpdateFeature,
TestReaderWriterMetadataNoAutoUpdateFeature,
TestRemovableWriterFeature,
Expand All @@ -400,8 +402,12 @@ object TableFeature {
}

/** Test only features that appear unsupported in order to test protocol validations. */
def testUnsupportedFeatures: Set[TableFeature] =
if (isTesting) Set(TestUnsupportedReaderWriterFeature) else Set.empty
def testUnsupportedFeatures: Set[TableFeature] = {
if (!isTesting) return Set.empty
Set(TestUnsupportedReaderWriterFeature,
TestUnsupportedNoHistoryProtectionReaderWriterFeature,
TestUnsupportedWriterFeature)
}

private val allDependentFeaturesMap: Map[TableFeature, Set[TableFeature]] = {
val dependentFeatureTuples =
Expand Down Expand Up @@ -1158,6 +1164,37 @@ object TestUnsupportedReaderWriterFeature
override def actionUsesFeature(action: Action): Boolean = false
}

/**
* Test feature that appears unsupported and can be dropped without checkpoint protection.
* it is used only for testing purposes.
*/
object TestUnsupportedNoHistoryProtectionReaderWriterFeature
extends ReaderWriterFeature(name = "testUnsupportedNoHistoryProtectionReaderWriter")
with RemovableFeature {

override def validateRemoval(snapshot: Snapshot): Boolean = true

override def requiresHistoryProtection: Boolean = false

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TestUnsupportedReaderWriterFeaturePreDowngradeCommand(table)

override def actionUsesFeature(action: Action): Boolean = false
}

object TestUnsupportedWriterFeature
extends WriterFeature(name = "testUnsupportedWriter")
with RemovableFeature {

/** Make sure the property is not enabled on the table. */
override def validateRemoval(snapshot: Snapshot): Boolean = true

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TestUnsupportedWriterFeaturePreDowngradeCommand(table)

override def actionUsesFeature(action: Action): Boolean = false
}

private[sql] object TestRemovableWriterFeatureWithDependency
extends WriterFeature(name = "testRemovableWriterFeatureWithDependency")
with FeatureAutomaticallyEnabledByMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,29 @@ trait TableFeatureSupport { this: Protocol =>
// new protocol
readerAndWriterFeatureNames.contains(feature.name)
}

/** Returns whether this client supports writing in a table with this protocol. */
def supportedForWrite(): Boolean = {
val supportedWriterVersions = Action.supportedWriterVersionNumbers
val supportedWriterFeatures = Action.supportedProtocolVersion().writerFeatureNames
val testUnsupportedFeatures: Set[String] = TableFeature.testUnsupportedFeatures
.filterNot(_.isReaderWriterFeature)
.map(_.name)

supportedWriterVersions.contains(this.minWriterVersion) &&
this.writerFeatureNames.subsetOf(supportedWriterFeatures -- testUnsupportedFeatures)
}

/** Returns whether this client supports reading a table with this protocol. */
def supportedForRead(): Boolean = {
val supportedReaderVersions = Action.supportedReaderVersionNumbers
val supportedReaderFeatures = Action.supportedProtocolVersion().readerFeatureNames
val testUnsupportedFeatures: Set[String] = TableFeature.testUnsupportedFeatures
.filter(_.isReaderWriterFeature).map(_.name)

supportedReaderVersions.contains(this.minReaderVersion) &&
this.readerFeatureNames.subsetOf(supportedReaderFeatures -- testUnsupportedFeatures)
}
}

object TableFeatureProtocolUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,25 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val ALLOW_METADATA_CLEANUP_WHEN_ALL_PROTOCOLS_SUPPORTED =
buildConf("tableFeatures.allowMetadataCleanupWhenAllProtocolsSupported")
.internal()
.doc(
"""Whether to perform protocol validation when the client is unable to clean
|up to 'delta.requireCheckpointProtectionBeforeVersion'.""".stripMargin)
.booleanConf
.createWithDefault(true)

val ALLOW_METADATA_CLEANUP_CHECKPOINT_EXISTENCE_CHECK_DISABLED =
buildConf("tableFeatures.dev.allowMetadataCleanupCheckpointExistenceCheck.disabled")
.internal()
.doc(
"""Whether to disable the checkpoint check at the cleanup boundary when performing
|the CheckpointProtectionTableFeature validations.
|This is only used for testing purposes.'.""".stripMargin)
.booleanConf
.createWithDefault(false)

val FAST_DROP_FEATURE_ENABLED =
buildConf("tableFeatures.dev.fastDropFeature.enabled")
.internal()
Expand Down
Loading
Loading