diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 5f5c7bd9f97..e4feab64a49 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -433,7 +433,7 @@ trait Checkpoints extends DeltaLogging { .takeWhile(tv => (cur == 0 || tv.version <= cur) && tv < upperBoundCv) .toArray val lastCheckpoint = - getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version)) + Checkpoints.getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version)) if (lastCheckpoint.isDefined) { logInfo(s"Delta checkpoint is found at version ${lastCheckpoint.get.version}") return lastCheckpoint @@ -444,31 +444,6 @@ trait Checkpoints extends DeltaLogging { logInfo(s"No checkpoint found for Delta table before version $startVersion") None } - - /** - * Given a list of checkpoint files, pick the latest complete checkpoint instance which is not - * later than `notLaterThan`. - */ - protected[delta] def getLatestCompleteCheckpointFromList( - instances: Array[CheckpointInstance], - notLaterThanVersion: Option[Long] = None): Option[CheckpointInstance] = { - val sentinelCv = CheckpointInstance.sentinelValue(notLaterThanVersion) - val complete = instances.filter(_ <= sentinelCv).groupBy(identity).filter { - case (ci, matchingCheckpointInstances) => - ci.format match { - case CheckpointInstance.Format.SINGLE => - matchingCheckpointInstances.length == 1 - case CheckpointInstance.Format.WITH_PARTS => - assert(ci.numParts.nonEmpty, "Multi-Part Checkpoint must have non empty numParts") - matchingCheckpointInstances.length == ci.numParts.get - case CheckpointInstance.Format.V2 => - matchingCheckpointInstances.length == 1 - case CheckpointInstance.Format.SENTINEL => - false - } - } - if (complete.isEmpty) None else Some(complete.keys.max) - } } object Checkpoints @@ -1050,6 +1025,31 @@ object Checkpoints None } else Some(struct(partitionValues: _*).as(STRUCT_PARTITIONS_COL_NAME)) } + + /** + * Given a list of checkpoint files, pick the latest complete checkpoint instance which is not + * later than `notLaterThan`. + */ + protected[delta] def getLatestCompleteCheckpointFromList( + instances: Array[CheckpointInstance], + notLaterThanVersion: Option[Long] = None): Option[CheckpointInstance] = { + val sentinelCv = CheckpointInstance.sentinelValue(notLaterThanVersion) + val complete = instances.filter(_ <= sentinelCv).groupBy(identity).filter { + case (ci, matchingCheckpointInstances) => + ci.format match { + case CheckpointInstance.Format.SINGLE => + matchingCheckpointInstances.length == 1 + case CheckpointInstance.Format.WITH_PARTS => + assert(ci.numParts.nonEmpty, "Multi-Part Checkpoint must have non empty numParts") + matchingCheckpointInstances.length == ci.numParts.get + case CheckpointInstance.Format.V2 => + matchingCheckpointInstances.length == 1 + case CheckpointInstance.Format.SENTINEL => + false + } + } + if (complete.isEmpty) None else Some(complete.keys.max) + } } object V2Checkpoint { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala index a7bc20abc2b..c23831e1bb7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala @@ -21,6 +21,7 @@ import java.io.FileNotFoundException import java.sql.Timestamp import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker, JobInfo, NotebookInfo} import org.apache.spark.sql.delta.metering.DeltaLogging @@ -431,13 +432,110 @@ object DeltaHistoryManager extends DeltaLogging { override def getVersion: Long = version } + /** + * Abstract iterator for breaking a sequence of items into groups based on custom logic. It wraps + * an underlying iterator, transforming and grouping its items according to specific rules defined + * in subclasses. This iterator excels in scenarios requiring the ordered processing of data + * streams, where specific items mark the beginning of new groups. It ensures items within a group + * are emitted together. + */ + private abstract class GroupBreakingIterator[T]( + underlying: Iterator[T]) extends Iterator[ArrayBuffer[T]] { + private var bufferedOutput: Option[mutable.ArrayBuffer[T]] = None + private val bufferedUnderlying = underlying.buffered + + protected def isNewGroupStart(item: T): Boolean + + protected def transformItem(item: T): T = item + + private def queueItemsIfNeeded(): Unit = { + if (bufferedOutput.isEmpty && bufferedUnderlying.hasNext) { + val group = new mutable.ArrayBuffer[T]() + while (!bufferedUnderlying.headOption.forall(isNewGroupStart) || group.isEmpty) { + group += transformItem(bufferedUnderlying.next()) + } + bufferedOutput = Some(group) + } + } + + override def hasNext: Boolean = { + queueItemsIfNeeded() + bufferedOutput.nonEmpty + } + + override def next(): ArrayBuffer[T] = { + if (!hasNext) throw new NoSuchElementException("No more items to return") + val output = bufferedOutput.get + bufferedOutput = None + output + } + } + + // All files with the same commit version + type SameCommitVersionFileGroup = ArrayBuffer[FileStatus] + + // All commits whose timestamps need to be adjusted because of the first commit in the group + type TimestampAdjustedCommitGroup = ArrayBuffer[SameCommitVersionFileGroup] + + // All timestamp groups that depend on the same checkpoint + type DependentCheckpointGroup = ArrayBuffer[TimestampAdjustedCommitGroup] + + /** + * Returns all checkpoint groups except the last, so we never delete the last checkpoint. + * + * An iterator that helps select old log files for deletion. It takes the input iterator of + * eligible log file groups to delete, grouped by version and timestamp-adjusting dependency, and + * returns all groups up to but not including the last group containing a checkpoint. + * This ensures time-travel can be supported anywhere within the retention window. + * Example Scenario: + * - 25 commits made. + * - Checkpoint files at the 10th and 20th commits. + * - Retention window starts from the 15th commit. + * We keep everything from the 10th commit checkpoint onwards. + * Note 1: 10th commit is retained in the above scenario to get a valid timestamp for version 10. + * Note 2: This code assumes that underlying iterator list groups in a sorted-by-version order. + */ + class LastCheckpointPreservingLogDeletionIterator( + underlying: Iterator[TimestampAdjustedCommitGroup]) + extends Iterator[DependentCheckpointGroup] { + + private class CheckpointGroupingIterator(underlying: Iterator[TimestampAdjustedCommitGroup]) + extends GroupBreakingIterator[TimestampAdjustedCommitGroup](underlying) { + + override protected def isNewGroupStart(files: TimestampAdjustedCommitGroup): Boolean = { + val checkpointPaths = files.flatten.toArray.collect { + case f if isCheckpointFile(f) => CheckpointInstance(f.getPath) + } + Checkpoints.getLatestCompleteCheckpointFromList(checkpointPaths).isDefined + } + } + + // 1. Group by checkpoints dependency. + // 2. Drop everything but the last checkpoint-dependency group. + // + // Implement Iterator.dropRight(1) using Iterator.sliding(2).map(_.head) since scala doesn't + // provide a dropRight method for iterators. + // (A, B), (B, C), ..., (X, Y), (Y, Z) + // ^ ^ ... ^ ^ + // Note: Iterator.sliding(2) can return a single entry if the underlying iterators' size is 1 + // https://www.scala-lang.org/old/node/7939 + // We fix by filtering out groups of length 1. + // (A) + // ^ + private val lastCheckpointPreservingIterator: Iterator[DependentCheckpointGroup] = + new CheckpointGroupingIterator(underlying).sliding(2).filter(_.length == 2).map(_.head) + + override def hasNext: Boolean = lastCheckpointPreservingIterator.hasNext + + override def next: DependentCheckpointGroup = lastCheckpointPreservingIterator.next() + } + /** * An iterator that helps select old log files for deletion. It takes the input iterator of log * files from the earliest file, and returns should-be-deleted files until the given maxTimestamp - * or maxVersion to delete is reached. Note that this iterator may stop deleting files earlier - * than maxTimestamp or maxVersion if it finds that files that need to be preserved for adjusting - * the timestamps of subsequent files. Let's go through an example. Assume the following commit - * history: + * is reached. Note that this iterator may stop deleting files earlier than maxTimestamp if it + * finds that files that need to be preserved for adjusting the timestamps of subsequent files. + * Let's go through an example. Assume the following commit history: * * +---------+-----------+--------------------+ * | Version | Timestamp | Adjusted Timestamp | @@ -453,13 +551,14 @@ object DeltaHistoryManager extends DeltaLogging { * As you can see from the example, we require timestamps to be monotonically increasing with * respect to the version of the commit, and each commit to have a unique timestamp. If we have * a commit which doesn't obey one of these two requirements, we adjust the timestamp of that - * commit to be one millisecond greater than the previous commit. + * commit to be one millisecond greater than the previous commit. We don't adjust or look at + * timestamps of checkpoints since they could be far in the future. * * Given the above commit history, the behavior of this iterator will be as follows: - * - For maxVersion = 1 and maxTimestamp = 9, we can delete versions 0 and 1 - * - Until we receive maxVersion >= 4 and maxTimestamp >= 12, we can't delete versions 2 and 3. + * - For maxTimestamp = 9, we can delete versions 0 and 1 + * - Until we receive maxTimestamp >= 12, we can't delete versions 2 and 3. * This is because version 2 is used to adjust the timestamps of commits up to version 4. - * - For maxVersion >= 5 and maxTimestamp >= 14 we can delete everything + * - For maxTimestamp >= 14 we can delete everything * The semantics of time travel guarantee that for a given timestamp, the user will ALWAYS get the * same version. Consider a user asks to get the version at timestamp 11. If all files are there, * we would return version 3 (timestamp 11) for this query. If we delete versions 0-2, the @@ -476,104 +575,156 @@ object DeltaHistoryManager extends DeltaLogging { * * @param underlying The iterator which gives the list of files in ascending version order * @param maxTimestamp The timestamp until which we can delete (inclusive). - * @param maxVersion The version until which we can delete (inclusive). - * @param versionGetter A method to get the commit version from the file path. */ - class BufferingLogDeletionIterator( - underlying: Iterator[FileStatus], - maxTimestamp: Long, - maxVersion: Long, - versionGetter: Path => Long) extends Iterator[FileStatus] { - /** - * Our output iterator - */ - private val filesToDelete = new mutable.Queue[FileStatus]() - /** - * Our intermediate buffer which will buffer files as long as the last file requires a timestamp - * adjustment. - */ - private val maybeDeleteFiles = new mutable.ArrayBuffer[FileStatus]() - private var lastFile: FileStatus = _ - private var hasNextCalled: Boolean = false - - private def init(): Unit = { - if (underlying.hasNext) { - lastFile = underlying.next() - maybeDeleteFiles.append(lastFile) + class TimestampAdjustingLogDeletionIterator( + underlying: Iterator[SameCommitVersionFileGroup], + maxTimestamp: Long) + extends Iterator[TimestampAdjustedCommitGroup] { + + private class TimestampAdjustingGroupedIterator( + underlying: Iterator[SameCommitVersionFileGroup]) + extends GroupBreakingIterator[SameCommitVersionFileGroup](underlying) { + + private var lastCommitFileOpt: Option[FileStatus] = None + + private def versionGetter(filePath: Path): Long = { + if (isCheckpointFile(filePath)) { + checkpointVersion(filePath) + } else { + deltaVersion(filePath) + } } - } - init() + /** + * Files need a time adjustment if their timestamp isn't later than the lastCommitFileOpt. + */ + private def needsTimestampAdjustment(commitFile: FileStatus): Boolean = { + lastCommitFileOpt.exists { lastFile => + versionGetter(lastFile.getPath) < versionGetter(commitFile.getPath) && + lastFile.getModificationTime >= commitFile.getModificationTime + } + } + + /** + * A new group can start if we have a delta file that doesn't need its timestamps changed. + * Non-delta files must continue to be in the same group because delta files coming later + * might need to adjust their timestamps based on the lastCommitFileInfoOpt. + */ + override protected def isNewGroupStart(files: SameCommitVersionFileGroup): Boolean = { + files.find(isDeltaFile).exists(!needsTimestampAdjustment(_)) + } + + private def createAdjustedFileStatus( + commitFile: FileStatus, + adjustedModificationTime: Long): FileStatus = { + new FileStatus( + commitFile.getLen, + commitFile.isDirectory, + commitFile.getReplication, + commitFile.getBlockSize, + adjustedModificationTime, + commitFile.getPath) + } + + override protected def transformItem( + files: SameCommitVersionFileGroup): SameCommitVersionFileGroup = { + val commitFileIndex = files.indexWhere(isDeltaFile) + if (commitFileIndex >= 0) { + val commitFile = files(commitFileIndex) + if (needsTimestampAdjustment(commitFile)) { + files(commitFileIndex) = + createAdjustedFileStatus(commitFile, lastCommitFileOpt.get.getModificationTime + 1) + } + lastCommitFileOpt = Some(files(commitFileIndex)) + } + files + } + } /** Whether the given file can be deleted based on the version and retention timestamp input. */ private def shouldDeleteFile(file: FileStatus): Boolean = { - file.getModificationTime <= maxTimestamp && versionGetter(file.getPath) <= maxVersion + file.getModificationTime <= maxTimestamp } /** - * Files need a time adjustment if their timestamp isn't later than the lastFile. + * A timestamp-adjusting group can be deleted if the last delta file in it can be deleted. */ - private def needsTimeAdjustment(file: FileStatus): Boolean = { - versionGetter(lastFile.getPath) < versionGetter(file.getPath) && - lastFile.getModificationTime >= file.getModificationTime + private def shouldDeleteGroup(group: TimestampAdjustedCommitGroup): Boolean = { + group.flatten.reverse.find((file: FileStatus) => isDeltaFile(file)) + .forall(file => shouldDeleteFile(file)) } - /** - * Enqueue the files in the buffer if the last file is safe to delete. Clears the buffer. - */ - private def flushBuffer(): Unit = { - if (maybeDeleteFiles.lastOption.exists(shouldDeleteFile)) { - filesToDelete ++= maybeDeleteFiles + private def transformLastGroup( + group: TimestampAdjustedCommitGroup): TimestampAdjustedCommitGroup = { + val deltaFileIdx = group.lastIndexWhere { files => + files.exists((file: FileStatus) => isDeltaFile(file)) } - maybeDeleteFiles.clear() + group.take(deltaFileIdx + 1) ++ + group.drop(deltaFileIdx + 1).takeWhile(_.forall(shouldDeleteFile)) } + val filteredIterator: Iterator[TimestampAdjustedCommitGroup] = + new TimestampAdjustingGroupedIterator(underlying).takeWhile(shouldDeleteGroup) + + override def hasNext: Boolean = filteredIterator.hasNext + /** - * Peeks at the next file in the iterator. Based on the next file we can have three - * possible outcomes: - * - The underlying iterator returned a file, which doesn't require timestamp adjustment. If - * the file in the buffer has expired, flush the buffer to our output queue. - * - The underlying iterator returned a file, which requires timestamp adjustment. In this case, - * we add this file to the buffer and fetch the next file - * - The underlying iterator is empty. In this case, we check the last file in the buffer. If - * it has expired, then flush the buffer to the output queue. - * Once this method returns, the buffer is expected to have 1 file (last file of the - * underlying iterator) unless the underlying iterator is fully consumed. + * We potentially drop trailing checkpoints from the last group by explicitly checking if they + * are within the retention window. */ - private def queueFilesInBuffer(): Unit = { - var continueBuffering = true - while (continueBuffering) { - if (!underlying.hasNext) { - flushBuffer() - return - } - - var currentFile = underlying.next() - require(currentFile != null, "FileStatus iterator returned null") - if (needsTimeAdjustment(currentFile)) { - currentFile = new FileStatus( - currentFile.getLen, currentFile.isDirectory, currentFile.getReplication, - currentFile.getBlockSize, lastFile.getModificationTime + 1, currentFile.getPath) - maybeDeleteFiles.append(currentFile) - } else { - flushBuffer() - maybeDeleteFiles.append(currentFile) - continueBuffering = false - } - lastFile = currentFile + override def next(): ArrayBuffer[ArrayBuffer[FileStatus]] = { + val group = filteredIterator.next() + if (hasNext) { + group + } else { + transformLastGroup(group) } } + } - override def hasNext: Boolean = { - hasNextCalled = true - if (filesToDelete.isEmpty) queueFilesInBuffer() - filesToDelete.nonEmpty + /** + * An iterator that groups same types of files by version. For example for an input iterator: + * - 11.checkpoint.0.1.parquet + * - 11.checkpoint.1.1.parquet + * - 11.json + * - 12.checkpoint.parquet + * - 12.json + * - 13.json + * - 14.json + * - 15.checkpoint.0.1.parquet + * - 15.checkpoint.1.1.parquet + * - 15.checkpoint..parquet + * - 15.json This will return: + * - (11, Seq(11.checkpoint.0.1.parquet, 11.checkpoint.1.1.parquet, 11.json)) + * - (12, Seq(12.checkpoint.parquet, 12.json)) + * - (13, Seq(13.json)) + * - (14, Seq(14.json)) + * - (15, Seq(15.checkpoint.0.1.parquet, 15.checkpoint.1.1.parquet, + * 15.checkpoint..parquet, 15.json)) + */ + class DeltaLogGroupingIterator(checkpointAndDeltas: Iterator[FileStatus]) + extends Iterator[(Long, ArrayBuffer[FileStatus])] { + + private val bufferedIterator = checkpointAndDeltas.buffered + + private def getFileVersion(file: FileStatus): Long = { + if (isCheckpointFile(file.getPath)) { + checkpointVersion(file.getPath) + } else { + deltaVersion(file.getPath) + } } - override def next(): FileStatus = { - if (!hasNextCalled) throw new NoSuchElementException() - hasNextCalled = false - filesToDelete.dequeue() + override def hasNext: Boolean = bufferedIterator.hasNext + + override def next(): (Long, ArrayBuffer[FileStatus]) = { + val first = bufferedIterator.next() + val buffer = scala.collection.mutable.ArrayBuffer(first) + val firstFileVersion = getFileVersion(first) + while (bufferedIterator.headOption.exists(getFileVersion(_) == firstFileVersion)) { + buffer += bufferedIterator.next() + } + firstFileVersion -> buffer } } } @@ -629,4 +780,3 @@ object DeltaHistory { engineInfo = ci.engineInfo) } } - diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala index 9de270c359f..95d6565ef88 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala @@ -20,7 +20,7 @@ import java.util.{Calendar, TimeZone} import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator +import org.apache.spark.sql.delta.DeltaHistoryManager._ import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, MINUTE, TruncationGranularity} import org.apache.spark.sql.delta.actions.{Action, Metadata} import org.apache.spark.sql.delta.metering.DeltaLogging @@ -109,27 +109,54 @@ trait MetadataCleanup extends DeltaLogging { /** * Returns an iterator of expired delta logs that can be cleaned up. For a delta log to be - * considered as expired, it must: - * - have a checkpoint file after it + * considered as expired, it must meet all of these conditions: + * - not have a timestamp-adjusted delta file that depends on it + * - have a checkpoint file after that's before the `fileCutOffTime` * - be older than `fileCutOffTime` + * + * The algorithm works as follows: + * 1. Group files by their version number to create same-commit-version-file-group. + * 2. Group same-commit-version-file-group by their timestamp skew (adjusted timestamp) to + * create timestamp-adjusted-commit-groups. + * 3. Keep only the timestamp-adjusted-commit-groups whose start timestamp is less than or equal + * to the cutoff timestamp. + * 4. Remove any timestamp-adjusted-commit-groups that are fully protected. + * 5. For the last timestamp group, remove any version groups whose adjusted timestamp is after + * the cutoff. + * 6. Check each remaining timestamp-adjusted-commit-group to see if it contains a complete + * checkpoint. + * 7. Group timestamp-adjusted-commit-group based on their checkpoint dependency (i.e., which + * checkpoint they depend on) to create dependent-checkpoint-groups. + * 8. Remove the last dependent-checkpoint-groups group (to ensure we always retain at least one + * checkpoint). + * 9. Triple Flatten the remaining groups and delete all those files. + * + * Note: We always consume the iterator lazily in all of the above steps to avoid loading all + * files in memory at once. */ private def listExpiredDeltaLogs(fileCutOffTime: Long): Iterator[FileStatus] = { import org.apache.spark.sql.delta.util.FileNames._ val latestCheckpoint = readLastCheckpointFile() if (latestCheckpoint.isEmpty) return Iterator.empty - val threshold = latestCheckpoint.get.version - 1L val files = store.listFrom(listingPrefix(logPath, 0), newDeltaHadoopConf()) .filter(f => isCheckpointFile(f) || isDeltaFile(f)) - def getVersion(filePath: Path): Long = { - if (isCheckpointFile(filePath)) { - checkpointVersion(filePath) - } else { - deltaVersion(filePath) - } - } - new BufferingLogDeletionIterator(files, fileCutOffTime, threshold, getVersion) + val groupedByVersion: Iterator[SameCommitVersionFileGroup] = + new DeltaLogGroupingIterator(files).map(_._2) + + val filteredByMaxTimestampAndTimestampSkew: Iterator[TimestampAdjustedCommitGroup] = + new TimestampAdjustingLogDeletionIterator( + underlying = groupedByVersion, + maxTimestamp = fileCutOffTime) + + val filteredByCheckpointDependency: Iterator[DependentCheckpointGroup] = + new LastCheckpointPreservingLogDeletionIterator(filteredByMaxTimestampAndTimestampSkew) + + val eligibleFilesToDelete: Iterator[FileStatus] = + filteredByCheckpointDependency.flatten.flatten.flatten + + eligibleFilesToDelete } /** @@ -187,7 +214,7 @@ trait MetadataCleanup extends DeltaLogging { .filter(_.format != CheckpointInstance.Format.V2) .toArray val availableNonV2Checkpoints = - getLatestCompleteCheckpointFromList(checkpoints, Some(checkpointVersion)) + Checkpoints.getLatestCompleteCheckpointFromList(checkpoints, Some(checkpointVersion)) if (availableNonV2Checkpoints.nonEmpty) { metrics.v2CheckpointCompatLogicTimeTakenMs = System.currentTimeMillis() - startTimeMs return @@ -343,7 +370,7 @@ trait MetadataCleanup extends DeltaLogging { def isCurrentCheckpointComplete: Boolean = { val instances = currentCheckpointFiles.map(CheckpointInstance(_)).toArray - getLatestCompleteCheckpointFromList(instances).isDefined + Checkpoints.getLatestCompleteCheckpointFromList(instances).isDefined } store.listFrom(listingPrefix(logPath, 0L), hadoopConf) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index c7e3f56c2cd..d24c67ca1b4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -334,7 +334,8 @@ trait SnapshotManagement { self: DeltaLog => val (deltas, compactedDeltas) = deltasAndCompactedDeltas.partition(isDeltaFile) // Find the latest checkpoint in the listing that is not older than the versionToLoad val checkpointFiles = checkpoints.map(f => CheckpointInstance(f.getPath)) - val newCheckpoint = getLatestCompleteCheckpointFromList(checkpointFiles, versionToLoad) + val newCheckpoint = Checkpoints.getLatestCompleteCheckpointFromList( + checkpointFiles, versionToLoad) val newCheckpointVersion = newCheckpoint.map(_.version).getOrElse { // If we do not have any checkpoint, pass new checkpoint version as -1 so that first // delta version can be 0. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala index 4684ee20685..35e2b50e55f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala @@ -27,9 +27,10 @@ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.DeltaHistoryManager.DeltaLogGroupingIterator import org.apache.hadoop.fs.Path - import org.apache.spark.SparkConf + import org.apache.spark.sql.QueryTest import org.apache.spark.util.ManualClock @@ -269,11 +270,15 @@ class DeltaRetentionSuite extends QueryTest log.checkpoint() val initialFiles = getLogFiles(logPath) + // Create a new commit and checkpoint before hte retention window so that the previous one + // can be deleted + startTxnWithManualLogCleanup(log).commit(AddFile("1", Map.empty, 1, 1, true) :: Nil, testOp) + log.checkpoint() + clock.advance(intervalStringToMillis(DeltaConfigs.LOG_RETENTION.defaultValue) + intervalStringToMillis("interval 1 day")) - // Create a new checkpoint so that the previous version can be deleted - log.startTransaction().commit(AddFile("1", Map.empty, 1, 1, true) :: Nil, testOp) + log.startTransaction().commit(AddFile("2", Map.empty, 1, 1, true) :: Nil, testOp) log.checkpoint() // despite our clock time being set in the future, this doesn't change the FileStatus @@ -331,6 +336,94 @@ class DeltaRetentionSuite extends QueryTest } } + test("DeltaLogGroupingIterator") { + val paths = Seq( + // checkpoint, commit and crc file present for v1 + "file://a/b/_delta_log/1.checkpoint.parquet", + "file://a/b/_delta_log/1.crc", + "file://a/b/_delta_log/1.json", + + // only json file present for v2 + "file://a/b/_delta_log/2.json", + // v3 missing + + // multiple types of checkpoint, compacted-delta and crc present for v4 + "file://a/b/_delta_log/4.6.compacted.json", + "file://a/b/_delta_log/4.checkpoint.parquet", + "file://a/b/_delta_log/4.checkpoint.uuid.parquet", + "file://a/b/_delta_log/4.checkpoint.json.parquet", + "file://a/b/_delta_log/4.checkpoint.0.1.parquet", + "file://a/b/_delta_log/4.checkpoint.1.1.parquet", + "file://a/b/_delta_log/4.crc", + "file://a/b/_delta_log/4.json", + // v5, v6 with single checkpoint file + "file://a/b/_delta_log/5.checkpoint.parquet", + "file://a/b/_delta_log/5.json", + "file://a/b/_delta_log/6.checkpoint.parquet", + "file://a/b/_delta_log/6.crc", + "file://a/b/_delta_log/6.json", + // no checkpoint files in the end + "file://a/b/_delta_log/7.json", + "file://a/b/_delta_log/8.9.compacted.json", + "file://a/b/_delta_log/8.json", + "file://a/b/_delta_log/9.crc", + "file://a/b/_delta_log/9.json", + "file://a/b/_delta_log/11.checkpoint.0.1.parquet", + "file://a/b/_delta_log/11.checkpoint.1.1.parquet", + "file://a/b/_delta_log/11.checkpoint.uuid.parquet", + "file://a/b/_delta_log/12.14.compacted.json", + "file://a/b/_delta_log/12.checkpoint.parquet", + "file://a/b/_delta_log/14.crc", + "file://a/b/_delta_log/14.json") + val fileStatuses = paths.map { path => + SerializableFileStatus(path, length = 10, isDir = false, modificationTime = 1).toFileStatus + }.toIterator + val groupedFileStatuses = new DeltaLogGroupingIterator(fileStatuses) + val groupedPaths = groupedFileStatuses.toIndexedSeq.map { case (version, files) => + (version, files.map(_.getPath.toString).toList) + } + assert(groupedPaths === Seq( + 1 -> List( + "file://a/b/_delta_log/1.checkpoint.parquet", + "file://a/b/_delta_log/1.crc", + "file://a/b/_delta_log/1.json"), + 2 -> List("file://a/b/_delta_log/2.json"), + 4 -> List( + "file://a/b/_delta_log/4.6.compacted.json", + "file://a/b/_delta_log/4.checkpoint.parquet", + "file://a/b/_delta_log/4.checkpoint.uuid.parquet", + "file://a/b/_delta_log/4.checkpoint.json.parquet", + "file://a/b/_delta_log/4.checkpoint.0.1.parquet", + "file://a/b/_delta_log/4.checkpoint.1.1.parquet", + "file://a/b/_delta_log/4.crc", + "file://a/b/_delta_log/4.json"), + 5 -> List( + "file://a/b/_delta_log/5.checkpoint.parquet", + "file://a/b/_delta_log/5.json"), + 6 -> List( + "file://a/b/_delta_log/6.checkpoint.parquet", + "file://a/b/_delta_log/6.crc", + "file://a/b/_delta_log/6.json"), + 7 -> List("file://a/b/_delta_log/7.json"), + 8 -> List( + "file://a/b/_delta_log/8.9.compacted.json", + "file://a/b/_delta_log/8.json"), + 9 -> List( + "file://a/b/_delta_log/9.crc", + "file://a/b/_delta_log/9.json"), + 11 -> List( + "file://a/b/_delta_log/11.checkpoint.0.1.parquet", + "file://a/b/_delta_log/11.checkpoint.1.1.parquet", + "file://a/b/_delta_log/11.checkpoint.uuid.parquet"), + 12 -> List( + "file://a/b/_delta_log/12.14.compacted.json", + "file://a/b/_delta_log/12.checkpoint.parquet"), + 14 -> List( + "file://a/b/_delta_log/14.crc", + "file://a/b/_delta_log/14.json") + )) + } + protected def cleanUpExpiredLogs(log: DeltaLog): Unit = { val snapshot = log.update() @@ -379,7 +472,7 @@ class DeltaRetentionSuite extends QueryTest log.startTransaction().commit(Seq(log.unsafeVolatileSnapshot.metadata), testOp) setModificationTimeOfNewFiles(log, clock, visitedFiles) - // Write a new checkpoint on each day. Each checkpoint has 2 sodecars: + // Write a new checkpoint on each day. Each checkpoint has 2 sidecars: // 1. Common sidecar - one of oddCommitSidecarFile_1/evenCommitSidecarFile_1 // 2. A new sidecar just created for this checkpoint. val sidecarFile1 = @@ -407,43 +500,43 @@ class DeltaRetentionSuite extends QueryTest evenCommitSidecarFile_1, oddCommitSidecarFile_1) ++ sidecarFiles.values.toIndexedSeq) - // Trigger metadata cleanup and validate that only last 6 days of deltas and checkpoints + // Trigger metadata cleanup and validate that only last 6(+1) days of deltas and checkpoints // have been retained. cleanUpExpiredLogs(log) - compareVersions(getCheckpointVersions(logPath), "checkpoint", 4 to 9) - compareVersions(getDeltaVersions(logPath), "delta", 4 to 9) + compareVersions(getCheckpointVersions(logPath), "checkpoint", 3 to 9) + compareVersions(getDeltaVersions(logPath), "delta", 3 to 9) // Check that all active sidecars are retained and expired ones are deleted. assert( getSidecarFiles(log) === Set(evenCommitSidecarFile_1, oddCommitSidecarFile_1) ++ - (4 to 9).map(sidecarFiles(_))) + (3 to 9).map(sidecarFiles(_))) // Advance 1 day and again run metadata cleanup. clock.setTime(day(startTime, day = 11)) cleanUpExpiredLogs(log) setModificationTimeOfNewFiles(log, clock, visitedFiles) - // Commit 4 and checkpoint 4 have expired and were deleted. - compareVersions(getCheckpointVersions(logPath), "checkpoint", 5 to 9) - compareVersions(getDeltaVersions(logPath), "delta", 5 to 9) + // Commit 3 and checkpoint 3 have expired and were deleted. + compareVersions(getCheckpointVersions(logPath), "checkpoint", 4 to 9) + compareVersions(getDeltaVersions(logPath), "delta", 4 to 9) assert( getSidecarFiles(log) === Set(evenCommitSidecarFile_1, oddCommitSidecarFile_1) ++ - (5 to 9).map(sidecarFiles(_))) + (4 to 9).map(sidecarFiles(_))) // do 1 more commit and checkpoint on day 13 and run metadata cleanup. commitAndCheckpoint(dayNumber = 13) // commit and checkpoint 10 - compareVersions(getCheckpointVersions(logPath), "checkpoint", 5 to 10) - compareVersions(getDeltaVersions(logPath), "delta", 5 to 10) + compareVersions(getCheckpointVersions(logPath), "checkpoint", 4 to 10) + compareVersions(getDeltaVersions(logPath), "delta", 4 to 10) cleanUpExpiredLogs(log) setModificationTimeOfNewFiles(log, clock, visitedFiles) - // Version 5 and 6 checkpoints and deltas have expired and were deleted. - compareVersions(getCheckpointVersions(logPath), "checkpoint", 7 to 10) - compareVersions(getDeltaVersions(logPath), "delta", 7 to 10) + // Version 4 and 5 checkpoints and deltas have expired and were deleted. + compareVersions(getCheckpointVersions(logPath), "checkpoint", 6 to 10) + compareVersions(getDeltaVersions(logPath), "delta", 6 to 10) assert( getSidecarFiles(log) === Set(evenCommitSidecarFile_1, oddCommitSidecarFile_1) ++ - (7 to 10).map(sidecarFiles(_))) + (6 to 10).map(sidecarFiles(_))) } } } @@ -488,8 +581,8 @@ class DeltaRetentionSuite extends QueryTest // 11th day Run metadata cleanup. clock.setTime(day(startTime, 11)) cleanUpExpiredLogs(log) - compareVersions(getCheckpointVersions(logPath), "checkpoint", 5 to 10) - compareVersions(getDeltaVersions(logPath), "delta", 5 to 10) + compareVersions(getCheckpointVersions(logPath), "checkpoint", 4 to 10) + compareVersions(getDeltaVersions(logPath), "delta", 4 to 10) val checkpointInstancesForV10 = getCheckpointFiles(logPath) .filter(f => getFileVersions(Seq(f)).head == 10) @@ -504,4 +597,103 @@ class DeltaRetentionSuite extends QueryTest } } } + + test("cleanup does not delete the latest checkpoint before the cutoff") { + withTempDir { tempDir => + val startTime = getStartTimeForRetentionTest + val clock = new ManualClock(startTime) + val actualTestStartTime = System.currentTimeMillis() + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val logPath = new File(log.logPath.toUri) + + // enforce 10 day checkpoint interval -- commit 0 + spark.sql( + s"""CREATE TABLE delta.`${tempDir.toString()}` (id Int) USING delta + | TBLPROPERTIES( + |-- Disable the async log cleanup as this test needs to manually trigger log + |-- clean up. + |'delta.enableExpiredLogCleanup' = 'false', + |'delta.checkpointInterval' = '10') + """.stripMargin) + // Set time for commit 0 to ensure that the commits don't need timestamp adjustment. + val commit0Time = clock.getTimeMillis() + new File(FileNames.deltaFile(log.logPath, 0).toUri).setLastModified(commit0Time) + new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time) + + // Day 0: Add commits 1 to 15 --> creates 1 checkpoint at Day 0 for version 10 + (1 to 15).foreach { i => + val txn = log.startTransaction() + val file = AddFile(i.toString, Map.empty, 1, 1, true) :: Nil + val delete: Seq[Action] = if (i > 1) { + val timestamp = startTime + (System.currentTimeMillis() - actualTestStartTime) + RemoveFile(i - 1 toString, Some(timestamp), true) :: Nil + } else { + Nil + } + val version = txn.commit(delete ++ file, testOp) + val deltaFile = new File(FileNames.deltaFile(log.logPath, version).toUri) + val time = clock.getTimeMillis() + i * 1000 + deltaFile.setLastModified(time) + val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri) + crcFile.setLastModified(time) + val chk = new File(FileNames.checkpointFileSingular(log.logPath, version).toUri) + if (chk.exists()) { + chk.setLastModified(time) + } + } + + // ensure that the checkpoint at version 10 exists + val checkpointFile = getCheckpointFiles(logPath) + .filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10).head + assert(checkpointFile.exists()) + val deltaFiles = (1 to 15).map { i => + new File(FileNames.deltaFile(log.logPath, i).toUri) + } + deltaFiles.foreach { f => + assert(f.exists()) + } + + // Day 35: Add commits 16 to 25 --> creates 1 checkpoint at Day 35 for version 20 + clock.setTime(day(startTime, 35)) + (16 to 25).foreach { i => + val txn = if (i == 1) startTxnWithManualLogCleanup(log) else log.startTransaction() + val file = AddFile(i.toString, Map.empty, 1, 1, true) :: Nil + val delete: Seq[Action] = if (i > 1) { + val timestamp = startTime + (System.currentTimeMillis() - actualTestStartTime) + RemoveFile(i - 1 toString, Some(timestamp), true) :: Nil + } else { + Nil + } + val version = txn.commit(delete ++ file, testOp) + val deltaFile = new File(FileNames.deltaFile(log.logPath, version).toUri) + val time = clock.getTimeMillis() + i * 1000 + deltaFile.setLastModified(time) + val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri) + crcFile.setLastModified(time) + val chk = new File(FileNames.checkpointFileSingular(log.logPath, version).toUri) + if (chk.exists()) { + chk.setLastModified(time) + } + } + + assert(checkpointFile.exists()) + deltaFiles.foreach { f => + assert(f.exists()) + } + cleanUpExpiredLogs(log) + + // assert that the checkpoint from day 0 (at version 10) and all the commits after + // that are still there + assert(checkpointFile.exists()) + deltaFiles.zipWithIndex.foreach { case (f, i) => + val version = i + 1 // From 0-based indexing to 1-based versioning + if (version < 10) { + assert(!f.exists(), version) + } else { + assert(f.exists(), version) + } + } + } + } } + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala index e6076f1a22c..c805ae60c9a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala @@ -21,16 +21,18 @@ import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Date +import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.implicitConversions -import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator +import org.apache.spark.sql.delta.DeltaHistoryManager.{DeltaLogGroupingIterator, LastCheckpointPreservingLogDeletionIterator, TimestampAdjustingLogDeletionIterator} import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames} +import org.apache.spark.sql.delta.util.FileNames.isCheckpointFile import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.{functions, AnalysisException, QueryTest, Row} @@ -203,20 +205,49 @@ class DeltaTimeTravelSuite extends QueryTest */ private def createFileStatuses(modTimes: Long*): Iterator[FileStatus] = { modTimes.zipWithIndex.map { case (time, version) => - new FileStatus(10L, false, 1, 10L, time, new Path(version.toString)) + new FileStatus(10L, false, 1, 10L, time, new Path(s"file://a/b/_delta_log/$version.json")) + }.iterator + } + + private sealed trait TestAction + private case class Delta(time: Long) extends TestAction + private case class Checkpoint(time: Long) extends TestAction + private case class IncompleteCheckpoint(time: Long) extends TestAction + private case class CheckpointAndDelta(time: Long) extends TestAction + private case class IncompleteCheckpointAndDelta(time: Long) extends TestAction + private case class ExceptionThrowingFile(time: Long) extends TestAction + + private def createFileStatusesWithCheckpoints(inputData: TestAction*): Iterator[FileStatus] = { + def createFileStatus(time: Long, version: Int, suffix: String): FileStatus = + new FileStatus(10L, false, 1, 10L, time, new Path(s"file://a/b/_delta_log/$version$suffix")) + inputData.zipWithIndex.flatMap { case (action, version) => + action match { + case Delta(time) => Seq(createFileStatus(time, version, ".json")) + case Checkpoint(time) => Seq(createFileStatus(time, version, ".checkpoint.parquet")) + case IncompleteCheckpoint(time) => + Seq(createFileStatus(time, version, ".checkpoint.0.2.parquet")) + case CheckpointAndDelta(time) => + Seq(createFileStatus(time, version, ".checkpoint.parquet"), + createFileStatus(time, version, ".json")) + case IncompleteCheckpointAndDelta(time) => + Seq(createFileStatus(time, version, ".checkpoint.0.2.parquet"), + createFileStatus(time, version, ".json")) + case ExceptionThrowingFile(time) => + Seq(new FileStatus(10L, false, 1, 10L, time, new Path(s"gibberish.gibberish"))) + } }.iterator } /** - * Creates a log deletion iterator with a retention `maxTimestamp` and `maxVersion` (both - * inclusive). The input iterator takes the original file timestamps, and the deleted output will - * return the adjusted timestamps of files that would actually be consumed by the iterator. + * Creates a log deletion iterator with a retention `maxTimestamp` (inclusive). The input + * iterator takes the original file timestamps, and the deleted output will return the adjusted + * timestamps of files that would actually be consumed by the iterator. */ - private def testBufferingLogDeletionIterator( - maxTimestamp: Long, - maxVersion: Long)(inputTimestamps: Seq[Long], deleted: Seq[Long]): Unit = { - val i = new BufferingLogDeletionIterator( - createFileStatuses(inputTimestamps: _*), maxTimestamp, maxVersion, _.getName.toLong) + private def testTimestampAdjustingLogDeletionIterator( + maxTimestamp: Long)(inputTimestamps: Seq[Long], deleted: Seq[Long]): Unit = { + val i = new TimestampAdjustingLogDeletionIterator( + new DeltaLogGroupingIterator(createFileStatuses(inputTimestamps: _*)).map(_._2), + maxTimestamp).flatten.flatten deleted.foreach { ts => assert(i.hasNext, s"Was supposed to delete $ts, but iterator returned hasNext: false") assert(i.next().getModificationTime === ts, "Returned files out of order!") @@ -224,196 +255,281 @@ class DeltaTimeTravelSuite extends QueryTest assert(!i.hasNext, "Iterator should be consumed") } - test("BufferingLogDeletionIterator: iterator behavior") { - val i1 = new BufferingLogDeletionIterator(Iterator.empty, 100, 100, _ => 1) + private def testIteratorReturnsExpected( + i: Iterator[FileStatus], outputData: Seq[TestAction]): Unit = { + outputData.foreach { action => + val deltaTsOpt = action match { + case Delta(ts) => Some(ts) + case CheckpointAndDelta(ts) => Some(ts) + case IncompleteCheckpointAndDelta(ts) => Some(ts) + case _ => None + } + val checkpointTsOpt = action match { + case Checkpoint(ts) => Some(ts) + case IncompleteCheckpoint(ts) => Some(ts) + case CheckpointAndDelta(ts) => Some(ts) + case IncompleteCheckpointAndDelta(ts) => Some(ts) + case _ => None + } + checkpointTsOpt.foreach { ts => + assert(i.hasNext, s"Was supposed to delete $ts, but iterator returned hasNext: false") + val deletedFile = i.next() + assert(isCheckpointFile(deletedFile), "Returned file is different than expected") + } + deltaTsOpt.foreach { ts => + assert(i.hasNext, s"Was supposed to delete $ts, but iterator returned hasNext: false") + val deletedFile = i.next() + assert(!isCheckpointFile(deletedFile), "Returned file is different than expected") + } + } + assert(!i.hasNext, s"Iterator should be consumed") + } + + private def testTimestampAdjustingLogDeletionIteratorWithCheckpoints(maxTimestamp: Long)( + inputData: Seq[TestAction], deletedGroupCount: Int): Unit = { + val i = new TimestampAdjustingLogDeletionIterator( + new DeltaLogGroupingIterator(createFileStatusesWithCheckpoints(inputData: _*)).map(_._2), + maxTimestamp).flatten.flatten + testIteratorReturnsExpected(i, inputData.take(deletedGroupCount)) + } + + private def testLastCheckpointPreservingLogDeletionIterator( + maxTimestamp: Long)(inputData: Seq[TestAction], deletedGroupCount: Int): Unit = { + val i = new LastCheckpointPreservingLogDeletionIterator( + new TimestampAdjustingLogDeletionIterator( + new DeltaLogGroupingIterator(createFileStatusesWithCheckpoints(inputData: _*)).map(_._2), + maxTimestamp)).flatten.flatten.flatten + testIteratorReturnsExpected(i, inputData.take(deletedGroupCount)) + } + + test("TimestampAdjustingLogDeletionIterator: iterator behavior") { + val i1 = new TimestampAdjustingLogDeletionIterator(Iterator.empty, 100) intercept[NoSuchElementException](i1.next()) assert(!i1.hasNext) - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 100)( inputTimestamps = Seq(10), deleted = Seq(10) ) - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 100)( inputTimestamps = Seq(10, 15, 25), deleted = Seq(10, 15, 25) ) } - test("BufferingLogDeletionIterator: " + + test("TimestampAdjustingLogDeletionIterator: " + "early exit while handling adjusted timestamps due to timestamp") { // only should return 5 because 5 < 7 - testBufferingLogDeletionIterator(maxTimestamp = 7, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 7)( inputTimestamps = Seq(5, 10, 8, 12), deleted = Seq(5) ) // Should only return 5, because 10 is used to adjust the following 8 to 11 - testBufferingLogDeletionIterator(maxTimestamp = 10, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 10)( inputTimestamps = Seq(5, 10, 8, 12), deleted = Seq(5) ) // When it is 11, we can delete both 10 and 8 - testBufferingLogDeletionIterator(maxTimestamp = 11, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 11)( inputTimestamps = Seq(5, 10, 8, 12), deleted = Seq(5, 10, 11) ) // When it is 12, we can return all - testBufferingLogDeletionIterator(maxTimestamp = 12, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 12)( inputTimestamps = Seq(5, 10, 8, 12), deleted = Seq(5, 10, 11, 12) ) // Should only return 5, because 10 is used to adjust the following 8 to 11 - testBufferingLogDeletionIterator(maxTimestamp = 10, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 10)( inputTimestamps = Seq(5, 10, 8), deleted = Seq(5) ) // When it is 11, we can delete both 10 and 8 - testBufferingLogDeletionIterator(maxTimestamp = 11, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 11)( inputTimestamps = Seq(5, 10, 8), deleted = Seq(5, 10, 11) ) } - test("BufferingLogDeletionIterator: " + - "early exit while handling adjusted timestamps due to version") { - // only should return 5 because we can delete only up to version 0 - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 0)( - inputTimestamps = Seq(5, 10, 8, 12), - deleted = Seq(5) - ) - - // Should only return 5, because 10 is used to adjust the following 8 to 11 - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 1)( - inputTimestamps = Seq(5, 10, 8, 12), - deleted = Seq(5) - ) - - // When we can delete up to version 2, we can return up to version 2 - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 2)( - inputTimestamps = Seq(5, 10, 8, 12), - deleted = Seq(5, 10, 11) - ) - - // When it is version 3, we can return all - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 3)( - inputTimestamps = Seq(5, 10, 8, 12), - deleted = Seq(5, 10, 11, 12) - ) - - // Should only return 5, because 10 is used to adjust the following 8 to 11 - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 1)( - inputTimestamps = Seq(5, 10, 8), - deleted = Seq(5) - ) - - // When we can delete up to version 2, we can return up to version 2 - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 2)( - inputTimestamps = Seq(5, 10, 8), - deleted = Seq(5, 10, 11) - ) - } - - test("BufferingLogDeletionIterator: multiple adjusted timestamps") { + test("TimestampAdjustingLogDeletionIterator: multiple adjusted timestamps") { Seq(9, 10, 11).foreach { retentionTimestamp => // Files should be buffered but not deleted, because of the file 11, which has adjusted ts 12 - testBufferingLogDeletionIterator(maxTimestamp = retentionTimestamp, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = retentionTimestamp)( inputTimestamps = Seq(5, 10, 8, 11, 14), deleted = Seq(5) ) } // Safe to delete everything before (including) file: 11 which has adjusted timestamp 12 - testBufferingLogDeletionIterator(maxTimestamp = 12, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 12)( inputTimestamps = Seq(5, 10, 8, 11, 14), deleted = Seq(5, 10, 11, 12) ) - Seq(0, 1, 2).foreach { retentionVersion => - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = retentionVersion)( - inputTimestamps = Seq(5, 10, 8, 11, 14), - deleted = Seq(5) - ) - } - - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 3)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 100)( inputTimestamps = Seq(5, 10, 8, 11, 14), - deleted = Seq(5, 10, 11, 12) + deleted = Seq(5, 10, 11, 12, 14) ) // Test when the last element is adjusted with both timestamp and version Seq(9, 10, 11).foreach { retentionTimestamp => - testBufferingLogDeletionIterator(maxTimestamp = retentionTimestamp, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = retentionTimestamp)( inputTimestamps = Seq(5, 10, 8, 9), deleted = Seq(5) ) } - testBufferingLogDeletionIterator(maxTimestamp = 12, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 12)( inputTimestamps = Seq(5, 10, 8, 9), deleted = Seq(5, 10, 11, 12) ) - Seq(0, 1, 2).foreach { retentionVersion => - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = retentionVersion)( - inputTimestamps = Seq(5, 10, 8, 9), - deleted = Seq(5) - ) - } - - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 3)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 100)( inputTimestamps = Seq(5, 10, 8, 9), deleted = Seq(5, 10, 11, 12) ) Seq(9, 10, 11).foreach { retentionTimestamp => - testBufferingLogDeletionIterator(maxTimestamp = retentionTimestamp, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = retentionTimestamp)( inputTimestamps = Seq(10, 8, 9), deleted = Nil ) } // Test the first element causing cascading adjustments - testBufferingLogDeletionIterator(maxTimestamp = 12, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 12)( inputTimestamps = Seq(10, 8, 9), deleted = Seq(10, 11, 12) ) - Seq(0, 1).foreach { retentionVersion => - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = retentionVersion)( - inputTimestamps = Seq(10, 8, 9), - deleted = Nil - ) - } - - testBufferingLogDeletionIterator(maxTimestamp = 100, maxVersion = 2)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 100)( inputTimestamps = Seq(10, 8, 9), deleted = Seq(10, 11, 12) ) // Test multiple batches of time adjustments - testBufferingLogDeletionIterator(maxTimestamp = 12, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 12)( inputTimestamps = Seq(5, 10, 8, 9, 12, 15, 14, 14), // 5, 10, 11, 12, 13, 15, 16, 17 deleted = Seq(5) ) Seq(13, 14, 15, 16).foreach { retentionTimestamp => - testBufferingLogDeletionIterator(maxTimestamp = retentionTimestamp, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = retentionTimestamp)( inputTimestamps = Seq(5, 10, 8, 9, 12, 15, 14, 14), // 5, 10, 11, 12, 13, 15, 16, 17 deleted = Seq(5, 10, 11, 12, 13) ) } - testBufferingLogDeletionIterator(maxTimestamp = 17, maxVersion = 100)( + testTimestampAdjustingLogDeletionIterator(maxTimestamp = 17)( inputTimestamps = Seq(5, 10, 8, 9, 12, 15, 14, 14), // 5, 10, 11, 12, 13, 15, 16, 17 deleted = Seq(5, 10, 11, 12, 13, 15, 16, 17) ) } + test("TimestampAdjustingLogDeletionIterator: multiple adjusted timestamps with checkpoints") { + Seq(9, 10, 11).foreach { retentionTimestamp => + // Files should be buffered but not deleted, because of the file 11, which has adjusted ts 12 + testTimestampAdjustingLogDeletionIteratorWithCheckpoints(maxTimestamp = retentionTimestamp)( + inputData = + Seq(CheckpointAndDelta(5), Delta(10), Delta(8), CheckpointAndDelta(11), Delta(14)), + deletedGroupCount = 1) + } + + // Test that all checkpoints until the first checkpoint that needs to be retained are deleted. + Seq(9, 10, 11).foreach { retentionTimestamp => + testTimestampAdjustingLogDeletionIteratorWithCheckpoints(maxTimestamp = retentionTimestamp)( + inputData = Seq(Checkpoint(1), Checkpoint(3), IncompleteCheckpoint(5), Checkpoint(7), + Checkpoint(12), Checkpoint(9)), + deletedGroupCount = 4) + } + + Seq(7, 8, 9).foreach { retentionTimestamp => + testTimestampAdjustingLogDeletionIteratorWithCheckpoints(maxTimestamp = retentionTimestamp)( + inputData = Seq(CheckpointAndDelta(1), CheckpointAndDelta(3), + IncompleteCheckpointAndDelta(5), CheckpointAndDelta(7), CheckpointAndDelta(12), + CheckpointAndDelta(9)), + deletedGroupCount = 4) + } + + // Test that checkpoints are not retained for timestamp adjustments. + Seq(7, 8, 9).foreach { retentionTimestamp => + testTimestampAdjustingLogDeletionIteratorWithCheckpoints(retentionTimestamp)( + inputData = Seq(Checkpoint(1), Checkpoint(3), Checkpoint(5), Checkpoint(7), Checkpoint(6), + Checkpoint(7), Checkpoint(10)), + deletedGroupCount = 6) + } + + // Test that checkpoints are retained for commit-files time-adjustments. + Seq(7, 8).foreach { retentionTimestamp => + testTimestampAdjustingLogDeletionIteratorWithCheckpoints(retentionTimestamp)( + inputData = Seq(CheckpointAndDelta(1), Delta(3), Delta(5), CheckpointAndDelta(7), + Checkpoint(6), CheckpointAndDelta(6), Delta(7), CheckpointAndDelta(10)), + deletedGroupCount = 3) + } + + testTimestampAdjustingLogDeletionIteratorWithCheckpoints(maxTimestamp = 9)( + inputData = Seq(CheckpointAndDelta(1), Delta(3), Delta(5), CheckpointAndDelta(7), + Checkpoint(6), CheckpointAndDelta(6), Delta(7), CheckpointAndDelta(10)), + deletedGroupCount = 7) + + // Since 5.json(@timestamp-9) can be deleted, we will delete everything until that point even + // though some of the intermediate checkpoints are within the retention window. + testTimestampAdjustingLogDeletionIteratorWithCheckpoints(maxTimestamp = 10)( + inputData = Seq(Delta(1), Delta(6), Checkpoint(11), IncompleteCheckpoint(12), + CheckpointAndDelta(8), Delta(9), CheckpointAndDelta(11)), + deletedGroupCount = 6 + ) + } + + test("TimestampAdjustingLogDeletionIterator: " + + "skips consuming the iterator after entering retention window") { + // Since 1.json(@timestamp-12) can't be deleted + // - don't delete anything after that. + // - don't even consume more files after that to enable early-exit. + // (A couple of extra files is okay). + val inputData = Seq(Delta(8), Delta(12), Delta(15), ExceptionThrowingFile(18)) + val groups = createFileStatusesWithCheckpoints(inputData: _*).map(file => ArrayBuffer(file)) + val i = new TimestampAdjustingLogDeletionIterator(groups, 100) + while (i.hasNext) i.next() + } + + test("LastCheckpointPreservingLogDeletionIterator: don't rely on incomplete checkpoints") { + testLastCheckpointPreservingLogDeletionIterator(maxTimestamp = 100000)( + inputData = Seq(Delta(1), CheckpointAndDelta(2), Delta(3), Delta(4), CheckpointAndDelta(5), + Delta(6), Delta(7)), + deletedGroupCount = 4 + ) + + // Filter the last 2 checkpoints because of maxTimestamp and then delete everything until the + // last complete checkpoint (exclusive). + testLastCheckpointPreservingLogDeletionIterator(maxTimestamp = 5)( + inputData = Seq(CheckpointAndDelta(1), CheckpointAndDelta(2), CheckpointAndDelta(3), + IncompleteCheckpointAndDelta(4), CheckpointAndDelta(6), CheckpointAndDelta(7)), + deletedGroupCount = 2 + ) + + // Delete nothing when you don't find a complete checkpoint + testLastCheckpointPreservingLogDeletionIterator(maxTimestamp = 100000)( + inputData = Seq(IncompleteCheckpointAndDelta(1), IncompleteCheckpoint(2), + IncompleteCheckpointAndDelta(3)), + deletedGroupCount = 0 + ) + + // Delete everything until the last complete checkpoint + testLastCheckpointPreservingLogDeletionIterator(maxTimestamp = 100000)( + inputData = Seq(IncompleteCheckpointAndDelta(1), CheckpointAndDelta(2), + IncompleteCheckpoint(3), IncompleteCheckpointAndDelta(4), CheckpointAndDelta(5), + CheckpointAndDelta(6), IncompleteCheckpointAndDelta(7), IncompleteCheckpoint(8)), + deletedGroupCount = 5 + ) + } + test("[SPARK-45383] Time travel on a non-existing table should throw AnalysisException") { intercept[AnalysisException] { spark.sql("SELECT * FROM not_existing VERSION AS OF 0")