Skip to content

Commit

Permalink
RESTORE support for Delta tables with deletion vectors
Browse files Browse the repository at this point in the history
This PR is part of the feature: Support Delta tables with deletion vectors (more details at delta-io#1485)

It adds running RESTORE on a Delta table with deletion vectors. The main change is to take into consideration of the AddFile.deletionVector when comparing the target version being restored to and the current version to find the list of data files to add and remove.

Added tests

Closes delta-io#1735
  • Loading branch information
vkorukanti committed May 6, 2023
1 parent 9a114eb commit d587e4c
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,22 @@ object DeletionVectorDescriptor {
sizeInBytes = data.length,
cardinality = cardinality)

/**
* This produces the same output as [[DeletionVectorDescriptor.uniqueId]] but as a column
* expression, so it can be used directly in a Spark query.
*/
def uniqueIdExpression(deletionVectorCol: Column): Column = {
when(deletionVectorCol("offset").isNotNull,
concat(
deletionVectorCol("storageType"),
deletionVectorCol("pathOrInlineDv"),
lit('@'),
deletionVectorCol("offset")))
.otherwise(concat(
deletionVectorCol("storageType"),
deletionVectorCol("pathOrInlineDv")))
}

/**
* Return the unique path under `parentPath` that is based on `id`.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import scala.collection.JavaConverters._
import scala.util.{Success, Try}

import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaOperations, Snapshot}
import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile}
import org.apache.spark.sql.delta.actions.{AddFile, DeletionVectorDescriptor, RemoveFile}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.functions.{column, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.IGNORE_MISSING_FILES
import org.apache.spark.sql.types.LongType
Expand Down Expand Up @@ -120,26 +121,63 @@ case class RestoreTableCommand(

import org.apache.spark.sql.delta.implicits._

val filesToAdd = snapshotToRestoreFiles
.join(
latestSnapshotFiles,
snapshotToRestoreFiles("path") === latestSnapshotFiles("path"),
"left_anti")
.as[AddFile]
// If either source version or destination version contains DVs,
// we have to take them into account during deduplication.
val targetMayHaveDVs = DeletionVectorUtils.deletionVectorsReadable(latestSnapshot)
val sourceMayHaveDVs = DeletionVectorUtils.deletionVectorsReadable(snapshotToRestore)

val normalizedSourceWithoutDVs = snapshotToRestoreFiles.mapPartitions { files =>
files.map(file => (file, file.path))
}.toDF("srcAddFile", "srcPath")
val normalizedTargetWithoutDVs = latestSnapshotFiles.mapPartitions { files =>
files.map(file => (file, file.path))
}.toDF("tgtAddFile", "tgtPath")

def addDVsToNormalizedDF(
mayHaveDVs: Boolean,
dvIdColumnName: String,
dvAccessColumn: Column,
normalizedDf: DataFrame): DataFrame = {
if (mayHaveDVs) {
normalizedDf.withColumn(
dvIdColumnName,
DeletionVectorDescriptor.uniqueIdExpression(dvAccessColumn))
} else {
normalizedDf.withColumn(dvIdColumnName, lit(null))
}
}

val normalizedSource = addDVsToNormalizedDF(
mayHaveDVs = sourceMayHaveDVs,
dvIdColumnName = "srcDeletionVectorId",
dvAccessColumn = column("srcAddFile.deletionVector"),
normalizedDf = normalizedSourceWithoutDVs)

val normalizedTarget = addDVsToNormalizedDF(
mayHaveDVs = targetMayHaveDVs,
dvIdColumnName = "tgtDeletionVectorId",
dvAccessColumn = column("tgtAddFile.deletionVector"),
normalizedDf = normalizedTargetWithoutDVs)

val joinExprs =
column("srcPath") === column("tgtPath") and
// Use comparison operator where NULL == NULL
column("srcDeletionVectorId") <=> column("tgtDeletionVectorId")

val filesToAdd = normalizedSource
.join(normalizedTarget, joinExprs, "left_anti")
.select(column("srcAddFile").as[AddFile])
.map(_.copy(dataChange = true))

val filesToRemove = latestSnapshotFiles
.join(
snapshotToRestoreFiles,
latestSnapshotFiles("path") === snapshotToRestoreFiles("path"),
"left_anti")
.as[AddFile]
val filesToRemove = normalizedTarget
.join(normalizedSource, joinExprs, "left_anti")
.select(column("tgtAddFile").as[AddFile])
.map(_.removeWithTimestamp())

val ignoreMissingFiles = spark
.sessionState
.conf
.getConf(IGNORE_MISSING_FILES)
.sessionState
.conf
.getConf(IGNORE_MISSING_FILES)

if (!ignoreMissingFiles) {
checkSnapshotFilesAvailability(deltaLog, filesToAdd, versionToRestore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ private[delta] trait DeltaEncoders {
private lazy val _addFileWithIndexEncoder = new DeltaEncoder[(AddFile, Long)]
implicit def addFileWithIndexEncoder: Encoder[(AddFile, Long)] = _addFileWithIndexEncoder.get

private lazy val _addFileWithSourcePathEncoder = new DeltaEncoder[(AddFile, String)]
implicit def addFileWithSourcePathEncoder: Encoder[(AddFile, String)] =
_addFileWithSourcePathEncoder.get

private lazy val _deltaHistoryEncoder = new DeltaEncoder[DeltaHistory]
implicit def deltaHistoryEncoder: Encoder[DeltaHistory] = _deltaHistoryEncoder.get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DeltaImplicitsSuite extends SparkFunSuite with SharedSparkSession {
testImplict("serializableFileStatus", serializableFileStatusEncoder)
testImplict("indexedFile", indexedFileEncoder)
testImplict("addFileWithIndex", addFileWithIndexEncoder)
testImplict("addFileWithSourcePath", addFileWithSourcePathEncoder)
testImplict("deltaHistoryEncoder", deltaHistoryEncoder)
testImplict("historyCommitEncoder", historyCommitEncoder)
testImplict("snapshotStateEncoder", snapshotStateEncoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, RDDSc
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.util.Utils

trait DeltaTestUtilsBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.test.DeltaExcludedTestMixin

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -55,3 +58,215 @@ class RestoreTableScalaSuite extends RestoreTableSuiteBase {
}
}

class RestoreTableScalaDeletionVectorSuite
extends RestoreTableScalaSuite
with DeletionVectorsTestUtils
with DeltaExcludedTestMixin {

import testImplicits._

override def beforeAll(): Unit = {
super.beforeAll()
enableDeletionVectors(spark.conf)
}
override def excluded: Seq[String] = super.excluded ++
Seq(
// These tests perform a delete to produce a file to vacuum, but with persistent DVs enabled,
// we actually just add a DV to the file instead, so there's no unreferenced file for vacuum.
"restore after vacuum",
"restore after vacuum - cloned table",
// These rely on the new-table protocol version to be lower than the latest,
// but this isn't true for DVs.
"restore downgrade protocol (allowed=true)",
"restore downgrade protocol (allowed=false)",
"restore downgrade protocol with table features (allowed=true)",
"restore downgrade protocol with table features (allowed=false)",
"cdf + RESTORE with write amplification reduction",
"RESTORE doesn't account for session defaults"
)

case class RestoreAndCheckArgs(versionToRestore: Int, expectedResult: DataFrame)
type RestoreAndCheckFunction = RestoreAndCheckArgs => Unit

/**
* Tests `testFun` once by restoring to version and once to timestamp.
*
* `testFun` is expected to perform setup before executing the `RestoreAndTestFunction` and
* cleanup afterwards.
*/
protected def testRestoreByTimestampAndVersion
(testName: String)
(testFun: (String, RestoreAndCheckFunction) => Unit): Unit = {
for (restoreToVersion <- BOOLEAN_DOMAIN) {
val restoringTo = if (restoreToVersion) "version" else "timestamp"
test(testName + s" - restoring to $restoringTo") {
withTempDir{ dir =>
val path = dir.toString
val restoreAndCheck: RestoreAndCheckFunction = (args: RestoreAndCheckArgs) => {
val deltaLog = DeltaLog.forTable(spark, path)
if (restoreToVersion) {
restoreTableToVersion(path, args.versionToRestore, isTable = false)
} else {
// Set a custom timestamp for the commit
val desiredDateS = "1996-01-12"
setTimestampToCommitFileAtVersion(
deltaLog,
version = args.versionToRestore,
date = desiredDateS)
// Set all previous versions to something lower, so we don't error out.
for (version <- 0 until args.versionToRestore) {
val previousDateS = "1996-01-11"
setTimestampToCommitFileAtVersion(
deltaLog,
version = version,
date = previousDateS)
}

restoreTableToTimestamp(path, desiredDateS, isTable = false)
}
checkAnswer(spark.read.format("delta").load(path), args.expectedResult)
}
testFun(path, restoreAndCheck)
}
}
}
}

testRestoreByTimestampAndVersion(
"Restoring table with persistent DVs to version without DVs") { (path, restoreAndCheck) =>
val deltaLog = DeltaLog.forTable(spark, path)
val df1 = Seq(1, 2, 3, 4, 5).toDF("id")
val values2 = Seq(6, 7, 8, 9, 10)
val df2 = values2.toDF("id")

// Write all values into version 0.
df1.union(df2).coalesce(1).write.format("delta").save(path) // version 0
checkAnswer(spark.read.format("delta").load(path), expectedAnswer = df1.union(df2))
val snapshotV0 = deltaLog.update()
assert(snapshotV0.version === 0)

// Delete values 2 so that version 1 is `df1`.
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (${values2.mkString(", ")})") // version 1
assert(getFilesWithDeletionVectors(deltaLog).size > 0)
checkAnswer(spark.read.format("delta").load(path), expectedAnswer = df1)
val snapshotV1 = deltaLog.snapshot
assert(snapshotV1.version === 1)

restoreAndCheck(RestoreAndCheckArgs(versionToRestore = 0, expectedResult = df1.union(df2)))
assert(getFilesWithDeletionVectors(deltaLog).size === 0)
}

testRestoreByTimestampAndVersion(
"Restoring table with persistent DVs to version with DVs") { (path, restoreAndCheck) =>
val deltaLog = DeltaLog.forTable(spark, path)
val df1 = Seq(1, 2, 3, 4, 5).toDF("id")
val values2 = Seq(6, 7)
val df2 = values2.toDF("id")
val values3 = Seq(8, 9, 10)
val df3 = values3.toDF("id")

// Write all values into version 0.
df1.union(df2).union(df3).coalesce(1).write.format("delta").save(path) // version 0

// Delete values 2 and 3 in reverse order, so that version 1 is `df1.union(df2)`.
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (${values3.mkString(", ")})") // version 1
assert(getFilesWithDeletionVectors(deltaLog).size > 0)
checkAnswer(spark.read.format("delta").load(path), expectedAnswer = df1.union(df2))
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (${values2.mkString(", ")})") // version 2
assert(getFilesWithDeletionVectors(deltaLog).size > 0)

restoreAndCheck(RestoreAndCheckArgs(versionToRestore = 1, expectedResult = df1.union(df2)))
assert(getFilesWithDeletionVectors(deltaLog).size > 0)
}

testRestoreByTimestampAndVersion("Restoring table with persistent DVs to version " +
"without persistent DVs enabled") { (path, restoreAndCheck) =>
withSQLConf(
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "false",
// Disable the log clean up. Tests sets the timestamp on commit files to long back
// in time that triggers the commit file clean up as part of the [[MetadataCleanup]]
DeltaConfigs.ENABLE_EXPIRED_LOG_CLEANUP.defaultTablePropertyKey -> "false") {
val deltaLog = DeltaLog.forTable(spark, path)
val df1 = Seq(1, 2, 3, 4, 5).toDF("id")
val values2 = Seq(6, 7, 8, 9, 10)
val df2 = values2.toDF("id")

// Write all values into version 0.
df1.union(df2).coalesce(1).write.format("delta").save(path) // version 0
checkAnswer(spark.read.format("delta").load(path), expectedAnswer = df1.union(df2))
val snapshotV0 = deltaLog.update()
assert(snapshotV0.version === 0)
assert(!DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(snapshotV0.metadata))

// Upgrade to us DVs
spark.sql(s"ALTER TABLE delta.`$path` SET TBLPROPERTIES " +
s"(${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key} = true)")
val snapshotV1 = deltaLog.update()
assert(snapshotV1.version === 1)
assert(DeletionVectorUtils.deletionVectorsReadable(snapshotV1))
assert(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(snapshotV1.metadata))

// Delete values 2 so that version 1 is `df1`.
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (${values2.mkString(", ")})") // version 2
assert(getFilesWithDeletionVectors(deltaLog).size > 0)
checkAnswer(spark.read.format("delta").load(path), expectedAnswer = df1)
val snapshotV2 = deltaLog.update()
assert(snapshotV2.version === 2)

// Restore to before the version upgrade. Protocol version should be retained (to make the
// history readable), but DV creation should be disabled again.
restoreAndCheck(RestoreAndCheckArgs(versionToRestore = 0, expectedResult = df1.union(df2)))
val snapshotV3 = deltaLog.update()
assert(getFilesWithDeletionVectors(deltaLog).size === 0)
assert(DeletionVectorUtils.deletionVectorsReadable(snapshotV3))
assert(!DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(snapshotV3.metadata))
// Check that we can still read versions that did have DVs.
checkAnswer(
spark.read.format("delta").option("versionAsOf", "2").load(path),
expectedAnswer = df1)
}
}
test("CDF + DV + RESTORE") {
withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") {
withTempDir { tempDir =>
val df0 = Seq(0, 1).toDF("id") // version 0 = [0, 1]
df0.write.format("delta").save(tempDir.getAbsolutePath)

val df1 = Seq(2).toDF("id") // version 1: append to df0 = [0, 1, 2]
df1.write.mode("append").format("delta").save(tempDir.getAbsolutePath)

val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)
deltaTable.delete("id < 1") // version 2: delete (0) = [1, 2]

deltaTable.updateExpr(
"id > 1",
Map("id" -> "4")
) // version 3: update 2 --> 4 = [1, 4]

// version 4: restore to version 2 (delete 4, insert 2) = [1, 2]
restoreTableToVersion(tempDir.getAbsolutePath, 2, false)
checkAnswer(
CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempDir), 4, 4, spark)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP),
Row(4, "delete", 4) :: Row(2, "insert", 4) :: Nil
)

// version 5: restore to version 1 (insert 0) = [0, 1, 2]
restoreTableToVersion(tempDir.getAbsolutePath, 1, false)
checkAnswer(
CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempDir), 5, 5, spark)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP),
Row(0, "insert", 5) :: Nil
)

// version 6: restore to version 0 (delete 2) = [0, 1]
restoreTableToVersion(tempDir.getAbsolutePath, 0, false)
checkAnswer(
CDCReader.changesToBatchDF(DeltaLog.forTable(spark, tempDir), 6, 6, spark)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP),
Row(2, "delete", 6) :: Nil
)
}
}
}
}

0 comments on commit d587e4c

Please sign in to comment.