From a66e67ee670cc44ea20a95abf1311026596ba5ca Mon Sep 17 00:00:00 2001 From: Tom van Bussel Date: Fri, 19 May 2023 15:33:40 +0200 Subject: [PATCH 1/2] Add Default Row Commit Version to AddFile and RemoveFile --- .../apache/spark/sql/delta/Checkpoints.scala | 3 +- .../sql/delta/DefaultRowCommitVersion.scala | 36 ++++ .../sql/delta/OptimisticTransaction.scala | 11 +- .../org/apache/spark/sql/delta/Snapshot.scala | 3 +- .../spark/sql/delta/actions/actions.scala | 11 +- .../sql/delta/commands/CloneTableBase.scala | 5 +- .../spark/sql/delta/CheckpointsSuite.scala | 3 +- .../spark/sql/delta/DeltaLogSuite.scala | 2 +- .../DefaultRowCommitVersionSuite.scala | 158 ++++++++++++++++++ 9 files changed, 222 insertions(+), 10 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index b73da988951..09de77f1019 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -622,7 +622,8 @@ object Checkpoints extends DeltaLogging { col("add.dataChange"), // actually not really useful here col("add.tags"), col("add.deletionVector"), - col("add.baseRowId")) ++ + col("add.baseRowId"), + col("add.defaultRowCommitVersion")) ++ additionalCols: _* )) ) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala b/core/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala new file mode 100644 index 00000000000..a1b3eeb04bf --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala @@ -0,0 +1,36 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.actions.{Action, AddFile, Protocol} + +object DefaultRowCommitVersion { + def assignIfMissing( + protocol: Protocol, + actions: Iterator[Action], + version: Long): Iterator[Action] = { + if (!RowTracking.isSupported(protocol)) { + return actions + } + actions.map { + case a: AddFile if a.defaultRowCommitVersion.isEmpty => + a.copy(defaultRowCommitVersion = Some(version)) + case a => + a + } + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 91622bbf57b..74042456dfb 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1037,7 +1037,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } val (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo) = - doCommitRetryIteratively(snapshot.version + 1, currentTransactionInfo, isolationLevelToUse) + doCommitRetryIteratively( + getFirstAttemptVersion, currentTransactionInfo, isolationLevelToUse) logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}") (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo.actions) } catch { @@ -1084,7 +1085,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite context: Map[String, String], metrics: Map[String, String]): (Long, Snapshot) = { commitStartNano = System.nanoTime() - val attemptVersion = readVersion + 1 + val attemptVersion = getFirstAttemptVersion try { val commitInfo = CommitInfo( time = clock.getTimeMillis(), @@ -1134,6 +1135,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } allActions = RowId.assignFreshRowIds(spark, protocol, snapshot, allActions) + allActions = DefaultRowCommitVersion + .assignIfMissing(protocol, allActions, getFirstAttemptVersion) if (readVersion < 0) { deltaLog.createLogDirectory() @@ -1354,6 +1357,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite finalActions = RowId.assignFreshRowIds(spark, protocol, snapshot, finalActions.toIterator).toList + finalActions = DefaultRowCommitVersion + .assignIfMissing(protocol, finalActions.toIterator, getFirstAttemptVersion).toList // We make sure that this isn't an appendOnly table as we check if we need to delete // files. @@ -1710,6 +1715,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite conflictChecker.checkConflicts() } + protected def getFirstAttemptVersion: Long = readVersion + 1L + /** Returns the next attempt version given the last attempted version */ protected def getNextAttemptVersion(previousAttemptVersion: Long): Long = { val latestSnapshot = deltaLog.update() diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 2c57d86b3d8..b8756814731 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -253,7 +253,8 @@ class Snapshot( col(ADD_STATS_TO_USE_COL_NAME).as("stats"), col("add.tags"), col("add.deletionVector"), - col("add.baseRowId") + col("add.baseRowId"), + col("add.defaultRowCommitVersion") ))) .withColumn("remove", when( col("remove.path").isNotNull, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 6ee7199c504..abd2bd0123d 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -562,7 +562,9 @@ case class AddFile( override val tags: Map[String, String] = null, deletionVector: DeletionVectorDescriptor = null, @JsonDeserialize(contentAs = classOf[java.lang.Long]) - baseRowId: Option[Long] = None + baseRowId: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + defaultRowCommitVersion: Option[Long] = None ) extends FileAction { require(path.nonEmpty) @@ -580,7 +582,8 @@ case class AddFile( path, Some(timestamp), dataChange, extendedFileMetadata = Some(true), partitionValues, Some(size), newTags, deletionVector = deletionVector, - baseRowId = baseRowId + baseRowId = baseRowId, + defaultRowCommitVersion = defaultRowCommitVersion ) removedFile.numLogicalRecords = numLogicalRecords removedFile.estLogicalFileSize = estLogicalFileSize @@ -802,7 +805,9 @@ case class RemoveFile( override val tags: Map[String, String] = null, deletionVector: DeletionVectorDescriptor = null, @JsonDeserialize(contentAs = classOf[java.lang.Long]) - baseRowId: Option[Long] = None + baseRowId: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + defaultRowCommitVersion: Option[Long] = None ) extends FileAction { override def wrap: SingleAction = SingleAction(remove = this) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala index 614c649daaa..7d3ba530567 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala @@ -278,8 +278,11 @@ abstract class CloneTableBase( val copiedFile = fileToCopy.copy(dataChange = true) opName match { case CloneTableCommand.OP_NAME => + // CLONE does not preserve Row IDs and Commit Versions + copiedFile.copy(baseRowId = None, defaultRowCommitVersion = None) + case RestoreTableCommand.OP_NAME => + // RESTORE preserves Row IDs and Commit Versions copiedFile - case RestoreTableCommand.OP_NAME => copiedFile } } val sourceName = sourceTable.name diff --git a/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 25cb6884281..008e5d24d68 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -226,7 +226,8 @@ class CheckpointsSuite extends QueryTest "partitionValues", "size", "deletionVector", - "baseRowId") + "baseRowId", + "defaultRowCommitVersion") val tablePath = tempDir.getAbsolutePath // Append rows [0, 9] to table and merge tablePath. diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 26a4a6e9cdd..54d6eed5662 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -313,7 +313,7 @@ class DeltaLogSuite extends QueryTest assert(log.update().allFiles.collect().find(_.path == "foo") // `dataChange` is set to `false` after replaying logs. - === Some(add2.copy(dataChange = false))) + === Some(add2.copy(dataChange = false, defaultRowCommitVersion = Some(2)))) } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala new file mode 100644 index 00000000000..ae0eeccb880 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala @@ -0,0 +1,158 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.rowtracking + +import scala.collection.mutable + +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile} +import org.apache.spark.sql.delta.rowid.RowIdTestUtils +import org.apache.spark.sql.delta.stats.DeltaScan + +import org.apache.spark.SparkConf +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.test.SharedSparkSession + +class DefaultRowCommitVersionSuite extends QueryTest with SharedSparkSession with RowIdTestUtils { + + def expectedCommitVersionsForAllFiles(deltaLog: DeltaLog): Map[String, Long] = { + val commitVersionForFiles = mutable.Map.empty[String, Long] + deltaLog.getChanges(startVersion = 0).foreach { case (commitVersion, actions) => + actions.foreach { + case a: AddFile if !commitVersionForFiles.contains(a.path) => + commitVersionForFiles += a.path -> commitVersion + case r: RemoveFile if commitVersionForFiles.contains(r.path) => + assert(r.defaultRowCommitVersion.contains(commitVersionForFiles(r.path))) + case _ => + // Do nothing + } + } + commitVersionForFiles.toMap + } + + test("defaultRowCommitVersion is not set when feature is disabled") { + withRowTrackingEnabled(enabled = false) { + withTempDir { tempDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("overwrite").save(tempDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + deltaLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.isEmpty) + } + } + } + } + + test("checkpoint preserves defaultRowCommitVersion") { + withRowTrackingEnabled(enabled = true) { + withTempDir { tempDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 200, end = 300, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + val commitVersionForFiles = expectedCommitVersionsForAllFiles(deltaLog) + + deltaLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path))) + } + + deltaLog.checkpoint(deltaLog.update()) + + deltaLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path))) + } + } + } + } + + test("data skipping reads defaultRowCommitVersion") { + withRowTrackingEnabled(enabled = true) { + withTempDir { tempDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 200, end = 300, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + val commitVersionForFiles = expectedCommitVersionsForAllFiles(deltaLog) + + val filters: Seq[Expression] = Seq(col("id = 150").expr) + val scan: DeltaScan = deltaLog.update().filesForScan(filters) + + scan.files.foreach { f => + assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path))) + } + } + } + } + + test("clone does not preserve default row commit versions") { + withRowTrackingEnabled(enabled = true) { + withTempDir { sourceDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(sourceDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(sourceDir.getAbsolutePath) + spark.range(start = 200, end = 300, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(sourceDir.getAbsolutePath) + + withTable("target") { + spark.sql(s"CREATE TABLE target SHALLOW CLONE delta.`${sourceDir.getAbsolutePath}`") + + val targetLog = DeltaLog.forTable(spark, TableIdentifier("target")) + targetLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.contains(0L)) + } + } + } + } + } + + test("restore does preserve default row commit versions") { + withRowTrackingEnabled(enabled = true) { + withTempDir { tempDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 200, end = 300, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + val commitVersionForFiles = expectedCommitVersionsForAllFiles(deltaLog) + + spark.sql(s"RESTORE delta.`${tempDir.getAbsolutePath}` TO VERSION AS OF 1") + + deltaLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path))) + } + } + } + } +} From 18b77adf00cf5cdf5c9d29274c2757bdce0ee1b5 Mon Sep 17 00:00:00 2001 From: Tom van Bussel Date: Thu, 25 May 2023 11:13:28 +0200 Subject: [PATCH 2/2] Address Ryan's feedback --- .../org/apache/spark/sql/delta/OptimisticTransaction.scala | 1 + .../delta/rowtracking/DefaultRowCommitVersionSuite.scala | 7 ++----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 74042456dfb..532d84da49e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1715,6 +1715,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite conflictChecker.checkConflicts() } + /** Returns the version that the first attempt will try to commit at. */ protected def getFirstAttemptVersion: Long = readVersion + 1L /** Returns the next attempt version given the last attempted version */ diff --git a/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala index ae0eeccb880..e742c564c2a 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala @@ -21,12 +21,9 @@ import scala.collection.mutable import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile} import org.apache.spark.sql.delta.rowid.RowIdTestUtils -import org.apache.spark.sql.delta.stats.DeltaScan -import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSparkSession @@ -102,8 +99,8 @@ class DefaultRowCommitVersionSuite extends QueryTest with SharedSparkSession wit val deltaLog = DeltaLog.forTable(spark, tempDir) val commitVersionForFiles = expectedCommitVersionsForAllFiles(deltaLog) - val filters: Seq[Expression] = Seq(col("id = 150").expr) - val scan: DeltaScan = deltaLog.update().filesForScan(filters) + val filters = Seq(col("id = 150").expr) + val scan = deltaLog.update().filesForScan(filters) scan.files.foreach { f => assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path)))