diff --git a/core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 b/core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 index c2e7a0bf1ae..82f626cb717 100644 --- a/core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 +++ b/core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 @@ -87,8 +87,11 @@ statement | ALTER TABLE table=qualifiedName DROP CONSTRAINT (IF EXISTS)? name=identifier #dropTableConstraint | OPTIMIZE (path=STRING | table=qualifiedName) - (WHERE partitionPredicate = predicateToken)? + (WHERE partitionPredicate=predicateToken)? (zorderSpec)? #optimizeTable + | REORG TABLE table=qualifiedName + (WHERE partitionPredicate=predicateToken)? + APPLY LEFT_PAREN PURGE RIGHT_PAREN #reorgTable | SHOW COLUMNS (IN | FROM) tableName=qualifiedName ((IN | FROM) schemaName=identifier)? #showColumns | cloneTableHeader SHALLOW CLONE source=qualifiedName clause=temporalClause? @@ -210,6 +213,7 @@ nonReserved | CONVERT | TO | DELTA | PARTITIONED | BY | DESC | DESCRIBE | LIMIT | DETAIL | GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE + | REORG | APPLY | PURGE | RESTORE | AS | OF | ZORDER | LEFT_PAREN | RIGHT_PAREN | SHOW | COLUMNS | IN | FROM | NO | STATISTICS @@ -219,6 +223,7 @@ nonReserved // Define how the keywords above should appear in a user's SQL statement. ADD: 'ADD'; ALTER: 'ALTER'; +APPLY: 'APPLY'; AS: 'AS'; BY: 'BY'; CHECK: 'CHECK'; @@ -255,7 +260,9 @@ NULL: 'NULL'; OF: 'OF'; OR: 'OR'; OPTIMIZE: 'OPTIMIZE'; +REORG: 'REORG'; PARTITIONED: 'PARTITIONED'; +PURGE: 'PURGE'; REPLACE: 'REPLACE'; RESTORE: 'RESTORE'; RETAIN: 'RETAIN'; diff --git a/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index b4309e98d9e..e24bf68a1e4 100644 --- a/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -346,7 +346,29 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { OptimizeTableCommand( Option(ctx.path).map(string), Option(ctx.table).map(visitTableIdentifier), - Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq, Map.empty)(interleaveBy) + Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq, + Map.empty)(interleaveBy) + } + + /** + * Creates a [[DeltaReorgTable]] logical plan. + * Examples: + * {{{ + * -- Physically delete dropped rows and columns of target table + * REORG TABLE (delta.`/path/to/table` | delta_table_name) + * [WHERE partition_predicate] APPLY (PURGE) + * }}} + */ + override def visitReorgTable(ctx: ReorgTableContext): AnyRef = withOrigin(ctx) { + if (ctx.table == null) { + throw new ParseException("REORG command requires a file path or table name.", ctx) + } + + val targetIdentifier = visitTableIdentifier(ctx.table) + val tableNameParts = targetIdentifier.database.toSeq :+ targetIdentifier.table + val targetTable = createUnresolvedTable(tableNameParts, "REORG") + + DeltaReorgTable(targetTable)(Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq) } override def visitDescribeDeltaDetail( diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 243ac0479d6..bb799c75d3b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -485,6 +485,14 @@ class DeltaAnalysis(session: SparkSession) DeltaMergeInto.resolveReferencesAndSchema(deltaMerge, conf)(tryResolveReferences(session)) + case reorg@DeltaReorgTable(_@ResolvedTable(_, _, t, _)) => + t match { + case table: DeltaTableV2 => + DeltaReorgTableCommand(table)(reorg.predicates) + case _ => + throw DeltaErrors.notADeltaTable(t.name()) + } + case deltaMerge: DeltaMergeInto => val d = if (deltaMerge.childrenResolved && !deltaMerge.resolved) { DeltaMergeInto.resolveReferencesAndSchema(deltaMerge, conf)(tryResolveReferences(session)) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala new file mode 100644 index 00000000000..c8d98cf4cc9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala @@ -0,0 +1,55 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.commands + +import org.apache.spark.sql.delta.catalog.DeltaTableV2 + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LeafCommand, LogicalPlan, UnaryCommand} + +case class DeltaReorgTable(target: LogicalPlan)(val predicates: Seq[String]) extends UnaryCommand { + + def child: LogicalPlan = target + + protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(target = newChild)(predicates) + + override val otherCopyArgs: Seq[AnyRef] = predicates :: Nil +} + +/** + * The PURGE command. + */ +case class DeltaReorgTableCommand(target: DeltaTableV2)(val predicates: Seq[String]) + extends OptimizeTableCommandBase with LeafCommand with IgnoreCachedData { + + override val otherCopyArgs: Seq[AnyRef] = predicates :: Nil + + override def run(sparkSession: SparkSession): Seq[Row] = { + val command = OptimizeTableCommand( + Option(target.path.toString), + target.catalogTable.map(_.identifier), + predicates, + options = Map.empty, + optimizeContext = DeltaOptimizeContext( + isPurge = true, + minFileSize = Some(0L), + maxDeletedRowsRatio = Some(0d)) + )(zOrderBy = Nil) + command.run(sparkSession) + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 70e8e327b9d..0b40cdc33ea 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -109,7 +109,9 @@ case class OptimizeTableCommand( path: Option[String], tableId: Option[TableIdentifier], userPartitionPredicates: Seq[String], - options: Map[String, String])(val zOrderBy: Seq[UnresolvedAttribute]) + options: Map[String, String], + optimizeContext: DeltaOptimizeContext = DeltaOptimizeContext() +)(val zOrderBy: Seq[UnresolvedAttribute]) extends OptimizeTableCommandBase with LeafRunnableCommand { override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil @@ -138,7 +140,34 @@ case class OptimizeTableCommand( validateZorderByColumns(sparkSession, txn, zOrderBy) val zOrderByColumns = zOrderBy.map(_.name).toSeq - new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns).optimize() + new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns, optimizeContext) + .optimize() + } +} + +/** + * Stored all runtime context information that can control the execution of optimize. + * + * @param isPurge Whether the rewriting task is only for purging soft-deleted data instead of + * for compaction. If [[isPurge]] is true, only files with DVs will be selected + * for compaction. + * @param minFileSize Files which are smaller than this threshold will be selected for compaction. + * If not specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE]] will be used. + * This parameter must be set to `0` when [[isPurge]] is true. + * @param maxDeletedRowsRatio Files with a ratio of soft-deleted rows to the total rows larger than + * this threshold will be rewritten by the OPTIMIZE command. If not + * specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]] + * will be used. This parameter must be set to `0` when [[isPurge]] is + * true. + */ +case class DeltaOptimizeContext( + isPurge: Boolean = false, + minFileSize: Option[Long] = None, + maxDeletedRowsRatio: Option[Double] = None) { + if (isPurge) { + require( + minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d), + "minFileSize and maxDeletedRowsRatio must be 0 when running PURGE.") } } @@ -154,7 +183,8 @@ class OptimizeExecutor( sparkSession: SparkSession, txn: OptimisticTransaction, partitionPredicate: Seq[Expression], - zOrderByColumns: Seq[String]) + zOrderByColumns: Seq[String], + optimizeContext: DeltaOptimizeContext) extends DeltaCommand with SQLMetricsReporting with Serializable { /** Timestamp to use in [[FileAction]] */ @@ -164,18 +194,16 @@ class OptimizeExecutor( def optimize(): Seq[Row] = { recordDeltaOperation(txn.deltaLog, "delta.optimize") { - val minFileSize = sparkSession.sessionState.conf.getConf( - DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE) - val maxFileSize = sparkSession.sessionState.conf.getConf( - DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) - require(minFileSize > 0, "minFileSize must be > 0") - require(maxFileSize > 0, "maxFileSize must be > 0") + val minFileSize = optimizeContext.minFileSize.getOrElse( + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)) + val maxFileSize = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) + val maxDeletedRowsRatio = optimizeContext.maxDeletedRowsRatio.getOrElse( + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO)) val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true) val partitionSchema = txn.metadata.partitionSchema - val maxDeletedRowsRatio = sparkSession.sessionState.conf.getConf( - DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO) val filesToProcess = pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq diff --git a/core/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/core/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 4088081761f..c18787474ad 100644 --- a/core/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/core/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -19,11 +19,10 @@ package io.delta.sql.parser import io.delta.tables.execution.VacuumTableCommand import org.apache.spark.sql.delta.CloneTableSQLTestUtils -import org.apache.spark.sql.delta.commands.OptimizeTableCommand - +import org.apache.spark.sql.delta.commands.{OptimizeTableCommand, DeltaReorgTable} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel} -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.SQLHelper @@ -120,6 +119,60 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder")))) } + private def targetPlanForTable(tableParts: String*): UnresolvedTable = + UnresolvedTable(tableParts.toSeq, "REORG", relationTypeMismatchHint = None) + + test("REORG command is parsed as expected") { + val parser = new DeltaSqlParser(null) + + assert(parser.parsePlan("REORG TABLE tbl APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty)) + + assert(parser.parsePlan("REORG TABLE tbl_${system:spark.testing} APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl_true"))(Seq.empty)) + + withSQLConf("tbl_var" -> "tbl") { + assert(parser.parsePlan("REORG TABLE ${tbl_var} APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty)) + + assert(parser.parsePlan("REORG TABLE ${spark:tbl_var} APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty)) + + assert(parser.parsePlan("REORG TABLE ${sparkconf:tbl_var} APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty)) + + assert(parser.parsePlan("REORG TABLE ${hiveconf:tbl_var} APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty)) + + assert(parser.parsePlan("REORG TABLE ${hivevar:tbl_var} APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl"))(Seq.empty)) + } + + assert(parser.parsePlan("REORG TABLE delta.`/path/to/tbl` APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("delta", "/path/to/tbl"))(Seq.empty)) + + assert(parser.parsePlan("REORG TABLE tbl WHERE part = 1 APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl"))(Seq("part = 1"))) + } + + test("REORG command new tokens are non-reserved keywords") { + // new keywords: REORG, APPLY, PURGE + val parser = new DeltaSqlParser(null) + + // Use the new keywords in table name + assert(parser.parsePlan("REORG TABLE reorg APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("reorg"))(Seq.empty)) + assert(parser.parsePlan("REORG TABLE apply APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("apply"))(Seq.empty)) + assert(parser.parsePlan("REORG TABLE purge APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("purge"))(Seq.empty)) + + // Use the new keywords in column name + assert(parser.parsePlan( + "REORG TABLE tbl WHERE reorg = 1 AND apply = 2 AND purge = 3 APPLY (PURGE)") === + DeltaReorgTable(targetPlanForTable("tbl"))(Seq("reorg = 1 AND apply =2 AND purge = 3"))) + } + // scalastyle:off argcount private def checkCloneStmt( parser: DeltaSqlParser, diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala index 5f7b3070cd3..ba01c377289 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala @@ -172,6 +172,14 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession { txn.commit(actions, Truncate()) } + protected def getFileActionsInLastVersion(log: DeltaLog): (Seq[AddFile], Seq[RemoveFile]) = { + val version = log.update().version + val allFiles = log.getChanges(version).toSeq.head._2 + val add = allFiles.collect { case a: AddFile => a } + val remove = allFiles.collect { case r: RemoveFile => r } + (add, remove) + } + protected def serializeRoaringBitmapArrayWithDefaultFormat( dv: RoaringBitmapArray): Array[Byte] = { val serializationFormat = RoaringBitmapArrayFormat.Portable diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala index c3cab478af0..6710a18096e 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala @@ -600,10 +600,8 @@ trait DeltaGenerateSymlinkManifestSuiteBase extends QueryTest subClass = ExistingDeletionVectorsWithIncrementalManifestGeneration) { setEnabledIncrementalManifest(tablePath, enabled = true) } - // Run optimize to delete the DVs and rewrite the data files - withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO.key -> "0.00001") { - spark.sql(s"OPTIMIZE delta.`$tablePath`") - } + // Purge + spark.sql(s"REORG TABLE delta.`$tablePath` APPLY (PURGE)") assert(getFilesWithDeletionVectors(deltaLog).isEmpty) // Now it should work. setEnabledIncrementalManifest(tablePath, enabled = true) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 2843153fc20..650b0e990ac 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -831,12 +831,10 @@ class DeltaVacuumSuite // Helper method to remove the DVs in Delta table and rewrite the data files def purgeDVs(tableName: String): Unit = { withSQLConf( - DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO.key -> "0.0001", - DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> "2", // Set the max file size to low so that we always rewrite the single file without DVs // and not combining with other data files. DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "2") { - spark.sql(s"OPTIMIZE $tableName") + spark.sql(s"REORG TABLE $tableName APPLY (PURGE)") } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala index b1c46e9f124..3b1a3533a2d 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala @@ -551,14 +551,6 @@ class DeletionVectorsSuite extends QueryTest } } - private def getFileActionsInLastVersion(log: DeltaLog): (Seq[AddFile], Seq[RemoveFile]) = { - val version = log.update().version - val allFiles = log.getChanges(version).toSeq.head._2 - val add = allFiles.collect { case a: AddFile => a } - val remove = allFiles.collect { case r: RemoveFile => r } - (add, remove) - } - private def assertPlanContains(queryDf: DataFrame, expected: String): Unit = { val optimizedPlan = queryDf.queryExecution.analyzed.toString() assert(optimizedPlan.contains(expected), s"Plan is missing `$expected`: $optimizedPlan") diff --git a/core/src/test/scala/org/apache/spark/sql/delta/optimize/DeltaReorgSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/optimize/DeltaReorgSuite.scala new file mode 100644 index 00000000000..715d952a86c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/optimize/DeltaReorgSuite.scala @@ -0,0 +1,113 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.optimize + +import org.apache.spark.sql.delta.DeletionVectorsTestUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.test.SharedSparkSession + +class DeltaReorgSuite extends QueryTest + with SharedSparkSession + with DeltaSQLCommandTest + with DeletionVectorsTestUtils { + + import testImplicits._ + + def executePurge(table: String, condition: Option[String] = None): Unit = { + condition match { + case Some(cond) => sql(s"REORG TABLE delta.`$table` WHERE $cond APPLY (PURGE)") + case None => sql(s"REORG TABLE delta.`$table` APPLY (PURGE)") + } + } + + test("Purge DVs will combine small files") { + val targetDf = spark.range(0, 100, 1, numPartitions = 5).toDF + withTempDeltaTable(targetDf) { (_, log) => + val path = log.dataPath.toString + + sql(s"DELETE FROM delta.`$path` WHERE id IN (0, 99)") + assert(log.update().allFiles.filter(_.deletionVector != null).count() === 2) + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "1073741824") { // 1gb + executePurge(path) + } + val (addFiles, _) = getFileActionsInLastVersion(log) + assert(addFiles.size === 1, "files should be combined") + assert(addFiles.forall(_.deletionVector === null)) + checkAnswer( + sql(s"SELECT * FROM delta.`$path`"), + (1 to 98).toDF()) + } + } + + test("Purge DVs") { + val targetDf = spark.range(0, 100, 1, numPartitions = 5).toDF() + withTempDeltaTable(targetDf) { (_, log) => + val path = log.dataPath.toString + + sql(s"DELETE FROM delta.`$path` WHERE id IN (0, 99)") + assert(log.update().allFiles.filter(_.deletionVector != null).count() === 2) + + // First purge + executePurge(path) + val (addFiles, _) = getFileActionsInLastVersion(log) + assert(addFiles.size === 1) // two files are combined + assert(addFiles.forall(_.deletionVector === null)) + checkAnswer( + sql(s"SELECT * FROM delta.`$path`"), + (1 to 98).toDF()) + + // Second purge is a noop + val versionBefore = log.update().version + executePurge(path) + val versionAfter = log.update().version + assert(versionBefore === versionAfter) + } + } + + test("Purge a non-DV table is a noop") { + val targetDf = spark.range(0, 100, 1, numPartitions = 5).toDF() + withTempDeltaTable(targetDf, enableDVs = false) { (_, log) => + val versionBefore = log.update().version + executePurge(log.dataPath.toString) + val versionAfter = log.update().version + assert(versionBefore === versionAfter) + } + } + + test("Purge some partitions of a table with DV") { + val targetDf = spark.range(0, 100, 1, numPartitions = 1) + .withColumn("part", col("id") % 4) + .toDF() + withTempDeltaTable(targetDf, partitionBy = Seq("part")) { (_, log) => + val path = log.dataPath + // Delete one row from each partition + sql(s"DELETE FROM delta.`$path` WHERE id IN (48, 49, 50, 51)") + val (addFiles1, _) = getFileActionsInLastVersion(log) + assert(addFiles1.size === 4) + assert(addFiles1.forall(_.deletionVector !== null)) + // PURGE two partitions + sql(s"REORG TABLE delta.`$path` WHERE part IN (0, 2) APPLY (PURGE)") + val (addFiles2, _) = getFileActionsInLastVersion(log) + assert(addFiles2.size === 2) + assert(addFiles2.forall(_.deletionVector === null)) + } + } +}