From 1c290c46788cda198cf523df2f317a76a946d430 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Fri, 27 Jan 2023 09:51:57 -0800 Subject: [PATCH] Support OPTIMIZE on Delta tables with DVs This PR is part of the feature: Support reading Delta tables with deletion vectors (more details at https://github.com/delta-io/delta/issues/1485) This PR adds support for running OPTIMIZE (file compaction or Z-Order By) on Delta tables with deletion vectors. It changes the following: * Selection criteria * File compaction: earlier we used to select files with size below `optimize.minFileSize` for compaction. Now we also consider the ratio of rows deleted in a file. If the deleted rows ratio is above `optimize.maxDeletedRowsRatio` (default 0.05), then it is also selected for compaction (which removes the DVs) * Z-Order: This hasn't been changed. We always select all the files in the selected partitions, so if a file has DV it gets removed as part of the Z-order by * Reading selected files with DV for OPTIMIZE: We go through the same read path as Delta table read which removes the deleted rows (according to the DV) from the scan output. * Metrics for deleted DVs Added tests. GitOrigin-RevId: b64d8beec8278e6665813642753ef0a19af5c985 --- .../spark/sql/delta/actions/actions.scala | 15 ++++ .../delta/commands/OptimizeTableCommand.scala | 65 ++++++++++++++-- .../commands/optimize/OptimizeStats.scala | 14 +++- .../sql/delta/sources/DeltaSQLConf.scala | 10 +++ .../sql/delta/DeletionVectorsTestUtils.scala | 34 +++++++++ .../optimize/OptimizeCompactionSuite.scala | 75 +++++++++++++++++++ .../delta/optimize/OptimizeMetricsSuite.scala | 73 ++++++++++++++++-- 7 files changed, 272 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index b790c570f38..1a21b0ab6a8 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -493,6 +493,21 @@ case class AddFile( /** Returns the total number of records, including those marked as deleted. */ @JsonIgnore def numPhysicalRecords: Option[Long] = numLogicalRecords.map(_ + numDeletedRecords) + + /** Returns the approx size of the remaining records after excluding the deleted ones. */ + @JsonIgnore + def estLogicalFileSize: Option[Long] = logicalToPhysicalRecordsRatio.map(n => (n * size).toLong) + + /** Returns the ratio of the logical number of records to the total number of records. */ + @JsonIgnore + def logicalToPhysicalRecordsRatio: Option[Double] = { + numLogicalRecords.map(numLogicalRecords => + numLogicalRecords.toDouble / (numLogicalRecords + numDeletedRecords).toDouble) + } + + /** Returns the ratio of number of deleted records to the total number of records. */ + @JsonIgnore + def deletedToPhysicalRecordsRatio: Option[Double] = logicalToPhysicalRecordsRatio.map(1.0d - _) } object AddFile { diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 3c3a1e8c6d1..0dd3f7bdfa4 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -25,7 +25,7 @@ import scala.collection.parallel.immutable.ParVector import org.apache.spark.sql.delta.skipping.MultiDimClustering import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaOperations.Operation -import org.apache.spark.sql.delta.actions.{Action, AddFile, FileAction, RemoveFile} +import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, FileAction, RemoveFile} import org.apache.spark.sql.delta.commands.optimize._ import org.apache.spark.sql.delta.files.SQLMetricsReporting import org.apache.spark.sql.delta.schema.SchemaUtils @@ -171,11 +171,12 @@ class OptimizeExecutor( require(minFileSize > 0, "minFileSize must be > 0") require(maxFileSize > 0, "maxFileSize must be > 0") - val candidateFiles = txn.filterFiles(partitionPredicate) + val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true) val partitionSchema = txn.metadata.partitionSchema - // select all files in case of multi-dimensional clustering - val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering) + val maxDeletedRowsRatio = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO) + val filesToProcess = pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) @@ -188,9 +189,10 @@ class OptimizeExecutor( val addedFiles = updates.collect { case a: AddFile => a } val removedFiles = updates.collect { case r: RemoveFile => r } + val removedDVs = filesToProcess.filter(_.deletionVector != null).map(_.deletionVector).toSeq if (addedFiles.size > 0) { val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns) - val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles) + val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs) commitAndRetry(txn, operation, updates, metrics) { newTxn => val newPartitionSchema = newTxn.metadata.partitionSchema val candidateSetOld = candidateFiles.map(_.path).toSet @@ -218,6 +220,11 @@ class OptimizeExecutor( optimizeStats.totalConsideredFiles = candidateFiles.size optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism + if (removedDVs.size > 0) { + optimizeStats.deletionVectorStats = Some(DeletionVectorStats( + numDeletionVectorsRemoved = removedDVs.size, + numDeletionVectorRowsRemoved = removedDVs.map(_.cardinality).sum)) + } if (isMultiDimClustering) { val inputFileStats = @@ -236,6 +243,29 @@ class OptimizeExecutor( } } + /** + * Helper method to prune the list of selected files based on fileSize and ratio of + * deleted rows according to the deletion vector in [[AddFile]]. + */ + private def pruneCandidateFileList( + minFileSize: Long, maxDeletedRowsRatio: Double, files: Seq[AddFile]): Seq[AddFile] = { + + // Select all files in case of multi-dimensional clustering + if (isMultiDimClustering) return files + + def shouldCompactBecauseOfDeletedRows(file: AddFile): Boolean = { + // Always compact files with DVs but without numRecords stats. + // This may be overly aggressive, but it fixes the problem in the long-term, + // as the compacted files will have stats. + (file.deletionVector != null && file.numPhysicalRecords.isEmpty) || + file.deletedToPhysicalRecordsRatio.getOrElse(0d) > maxDeletedRowsRatio + } + + // Select files that are small or have too many deleted rows + files.filter( + addFile => addFile.size < minFileSize || shouldCompactBecauseOfDeletedRows(addFile)) + } + /** * Utility methods to group files into bins for optimize. * @@ -370,7 +400,8 @@ class OptimizeExecutor( private def createMetrics( sparkContext: SparkContext, addedFiles: Seq[AddFile], - removedFiles: Seq[RemoveFile]): Map[String, SQLMetric] = { + removedFiles: Seq[RemoveFile], + removedDVs: Seq[DeletionVectorDescriptor]): Map[String, SQLMetric] = { def setAndReturnMetric(description: String, value: Long) = { val metric = createMetric(sparkContext, description) @@ -392,6 +423,25 @@ class OptimizeExecutor( totalSize } + val (deletionVectorRowsRemoved, deletionVectorBytesRemoved) = + removedDVs.map(dv => (dv.cardinality, dv.sizeInBytes.toLong)) + .reduceLeftOption((dv1, dv2) => (dv1._1 + dv2._1, dv1._2 + dv2._2)) + .getOrElse((0L, 0L)) + + val dvMetrics: Map[String, SQLMetric] = Map( + "numDeletionVectorsRemoved" -> + setAndReturnMetric( + "total number of deletion vectors removed", + removedDVs.size), + "numDeletionVectorRowsRemoved" -> + setAndReturnMetric( + "total number of deletion vector rows removed", + deletionVectorRowsRemoved), + "numDeletionVectorBytesRemoved" -> + setAndReturnMetric( + "total number of bytes of removed deletion vectors", + deletionVectorBytesRemoved)) + val sizeStats = FileSizeStatsWithHistogram.create(addedFiles.map(_.size).sorted) Map[String, SQLMetric]( "minFileSize" -> setAndReturnMetric("minimum file size", sizeStats.get.min), @@ -403,6 +453,7 @@ class OptimizeExecutor( "numRemovedFiles" -> setAndReturnMetric("total number of files removed.", removedFiles.size), "numAddedBytes" -> setAndReturnMetric("total number of bytes added", totalSize(addedFiles)), "numRemovedBytes" -> - setAndReturnMetric("total number of bytes removed", totalSize(removedFiles))) + setAndReturnMetric("total number of bytes removed", totalSize(removedFiles)) + ) ++ dvMetrics } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/optimize/OptimizeStats.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/optimize/OptimizeStats.scala index 5ae2b67c34d..1c49e3ea144 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/optimize/OptimizeStats.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/optimize/OptimizeStats.scala @@ -38,6 +38,7 @@ case class OptimizeStats( var endTimeMs: Long = 0, var totalClusterParallelism: Long = 0, var totalScheduledTasks: Long = 0, + var deletionVectorStats: Option[DeletionVectorStats] = None, var autoCompactParallelismStats: AutoCompactParallelismStats = AutoCompactParallelismStats()) { def toOptimizeMetrics: OptimizeMetrics = { @@ -58,6 +59,7 @@ case class OptimizeStats( endTimeMs = endTimeMs, totalClusterParallelism = totalClusterParallelism, totalScheduledTasks = totalScheduledTasks, + deletionVectorStats = deletionVectorStats, autoCompactParallelismStats = autoCompactParallelismStats.toMetrics) } } @@ -224,7 +226,8 @@ case class OptimizeMetrics( endTimeMs: Long = 0, totalClusterParallelism: Long = 0, totalScheduledTasks: Long = 0, - autoCompactParallelismStats: Option[ParallelismMetrics] = None + autoCompactParallelismStats: Option[ParallelismMetrics] = None, + deletionVectorStats: Option[DeletionVectorStats] = None ) /** @@ -253,3 +256,12 @@ case class ParallelismMetrics( minClusterActiveParallelism: Option[Long] = None, maxSessionActiveParallelism: Option[Long] = None, minSessionActiveParallelism: Option[Long] = None) + +/** + * Accumulator for statistics related with Deletion Vectors. + * Note that this case class contains mutable variables and cannot be used in places where immutable + * case classes can be used (e.g. map/set keys). + */ +case class DeletionVectorStats( + var numDeletionVectorsRemoved: Long = 0, + var numDeletionVectorRowsRemoved: Long = 0) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 33d6d881485..fa443e1fe8f 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1081,6 +1081,16 @@ trait DeltaSQLConfBase { |""".stripMargin) .booleanConf .createWithDefault(false) + + val DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO = + buildConf("optimize.maxDeletedRowsRatio") + .internal() + .doc("Files with a ratio of deleted rows to the total rows larger than this threshold " + + "will be rewritten by the OPTIMIZE command.") + .doubleConf + .checkValue(_ >= 0, "maxDeletedRowsRatio must be in range [0.0, 1.0]") + .checkValue(_ <= 1, "maxDeletedRowsRatio must be in range [0.0, 1.0]") + .createWithDefault(0.05d) } object DeltaSQLConf extends DeltaSQLConfBase diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala index c7d9a44356f..c22a7186a7a 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta import java.io.File import java.util.UUID +import org.apache.spark.sql.delta.DeltaOperations.Truncate import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, RemoveFile} import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -32,6 +33,14 @@ import org.apache.spark.sql.test.SharedSparkSession /** Collection of test utilities related with persistent Deletion Vectors. */ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession { + def testWithDVs(testName: String, testTags: org.scalatest.Tag*)(thunk: => Unit): Unit = { + test(testName, testTags : _*) { + withDeletionVectorsEnabled() { + thunk + } + } + } + /** Run a thunk with Deletion Vectors enabled/disabled. */ def withDeletionVectorsEnabled(enabled: Boolean = true)(thunk: => Unit): Unit = { val enabledStr = enabled.toString @@ -92,6 +101,31 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession { } // ======== HELPER METHODS TO WRITE DVs ========== + /** Helper method to remove the specified rows in the given file using DVs */ + protected def removeRowsFromFileUsingDV( + log: DeltaLog, + addFile: AddFile, + rowIds: Seq[Long]): Seq[Action] = { + val dv = RoaringBitmapArray(rowIds: _*) + writeFileWithDV(log, addFile, dv) + } + + /** Utility method to remove a ratio of rows from the given file */ + protected def deleteRows( + log: DeltaLog, file: AddFile, approxPhyRows: Long, ratioOfRowsToDelete: Double): Unit = { + val numRowsToDelete = + Math.ceil(ratioOfRowsToDelete * file.numPhysicalRecords.getOrElse(approxPhyRows)).toInt + removeRowsFromFile(log, file, Seq.range(0, numRowsToDelete)) + } + + /** Utility method to remove the given rows from the given file using DVs */ + protected def removeRowsFromFile( + log: DeltaLog, addFile: AddFile, rowIndexesToRemove: Seq[Long]): Unit = { + val txn = log.startTransaction() + val actions = removeRowsFromFileUsingDV(log, addFile, rowIndexesToRemove) + txn.commit(actions, Truncate()) + } + protected def serializeRoaringBitmapArrayWithDefaultFormat( dv: RoaringBitmapArray): Array[Byte] = { val serializationFormat = RoaringBitmapArrayFormat.Portable diff --git a/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala index 8ceebedbe40..ad529238f7b 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala @@ -22,6 +22,8 @@ import scala.collection.JavaConverters._ // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN +import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} import org.apache.spark.sql.delta.test.DeltaTestImplicits._ @@ -40,6 +42,7 @@ import org.apache.spark.sql.test.SharedSparkSession */ trait OptimizeCompactionSuiteBase extends QueryTest with SharedSparkSession + with DeletionVectorsTestUtils with DeltaColumnMappingTestUtils { import testImplicits._ @@ -153,6 +156,78 @@ trait OptimizeCompactionSuiteBase extends QueryTest } } + for (statsCollectionEnabled <- BOOLEAN_DOMAIN) + test( + s"optimize command with DVs when statsCollectionEnabled=$statsCollectionEnabled") { + withTempDir { tempDir => + val path = tempDir.getAbsolutePath + withSQLConf( + DeltaSQLConf.DELTA_COLLECT_STATS.key -> statsCollectionEnabled.toString, + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "true") { + // Create 10 files each with 1000 records + spark.range(start = 0, end = 10000, step = 1, numPartitions = 10) + .toDF("id") + .withColumn(colName = "extra", lit("just a random text to fill up the space.....")) + .write.format("delta").mode("append").save(path) // v0 + + val deltaLog = DeltaLog.forTable(spark, path) + val filesV0 = deltaLog.unsafeVolatileSnapshot.allFiles.collect() + assert(filesV0.size == 10) + + // Default `optimize.maxDeletedRowsRatio` is 0.05. + // Delete slightly more than threshold ration in two files, less in one of the file + val file0 = filesV0(1) + val file1 = filesV0(4) + val file2 = filesV0(8) + deleteRows(deltaLog, file0, approxPhyRows = 1000, ratioOfRowsToDelete = 0.06d) // v1 + deleteRows(deltaLog, file1, approxPhyRows = 1000, ratioOfRowsToDelete = 0.06d) // v2 + deleteRows(deltaLog, file2, approxPhyRows = 1000, ratioOfRowsToDelete = 0.01d) // v3 + + // Add a one small file, so that the file selection is based on both the file size and + // deleted rows ratio + spark.range(start = 1, end = 2, step = 1, numPartitions = 1) + .toDF("id").withColumn(colName = "extra", lit("")) + .write.format("delta").mode("append").save(path) // v4 + val smallFiles = addedFiles(deltaLog.getChanges(startVersion = 4).next()._2) + assert(smallFiles.size == 1) + + // Save the data before optimize for comparing it later with optimize + val data = spark.read.format("delta").load(path) + + // Set a low value for minFileSize so that the criteria for file selection is based on DVs + // and not based on the file size. + val targetSmallSize = smallFiles(0).size + 10 // A number just higher than the `smallFile` + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> targetSmallSize.toString) { + executeOptimizePath(path) // v5 + } + val changes = deltaLog.getChanges(startVersion = 5).next()._2 + + // When the stats are enabled, we expect the two files containing more than the + // threshold rows to be compacted. When stats are disabled, we expect all files with DVs + // compacted + var expectedRemoveFiles = Set(file0.path, file1.path) + if (!statsCollectionEnabled) expectedRemoveFiles += file2.path + // Expect the small file also to be compacted always + expectedRemoveFiles += smallFiles(0).path + + assert(removedFiles(changes).map(_.path).toSet === expectedRemoveFiles) + + assert(addedFiles(changes).size == 1) // Expect one new file added + + // Verify the final data after optimization hasn't changed. + checkAnswer(spark.read.format("delta").load(path), data) + } + } + } + + private def removedFiles(actions: Seq[Action]): Seq[RemoveFile] = { + actions.filter(_.isInstanceOf[RemoveFile]).map(_.asInstanceOf[RemoveFile]) + } + + private def addedFiles(actions: Seq[Action]): Seq[AddFile] = { + actions.filter(_.isInstanceOf[AddFile]).map(_.asInstanceOf[AddFile]) + } + def appendRowsToDeltaTable( path: String, numFiles: Int, diff --git a/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeMetricsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeMetricsSuite.scala index 9b5b54fafe8..f39f449ac71 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeMetricsSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.delta.optimize // scalastyle:off import.ordering.noEmptyLine import com.databricks.spark.util.Log4jUsageLogger -import org.apache.spark.sql.delta.{DeltaLog, Snapshot} -import org.apache.spark.sql.delta.commands.optimize.{FileSizeStats, OptimizeMetrics, ZOrderStats} +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.commands.optimize.{DeletionVectorStats, FileSizeStats, OptimizeMetrics, ZOrderStats} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.JsonUtils @@ -32,7 +32,8 @@ import org.apache.spark.sql.types._ /** Tests that run optimize and verify the returned output (metrics) is expected. */ trait OptimizeMetricsSuiteBase extends QueryTest - with SharedSparkSession { + with SharedSparkSession + with DeletionVectorsTestUtils { import testImplicits._ @@ -94,6 +95,10 @@ trait OptimizeMetricsSuiteBase extends QueryTest StructField("maxSessionActiveParallelism", LongType, nullable = true), StructField("minSessionActiveParallelism", LongType, nullable = true) )) + val dvMetricsSchema = StructType(Seq( + StructField("numDeletionVectorsRemoved", LongType, nullable = false), + StructField("numDeletionVectorRowsRemoved", LongType, nullable = false) + )) val optimizeMetricsSchema = StructType(Seq( StructField("numFilesAdded", LongType, nullable = false), @@ -112,7 +117,8 @@ trait OptimizeMetricsSuiteBase extends QueryTest StructField("endTimeMs", LongType, nullable = false), StructField("totalClusterParallelism", LongType, nullable = false), StructField("totalScheduledTasks", LongType, nullable = false), - StructField("autoCompactParallelismStats", parallelismMetricsSchema, nullable = true) + StructField("autoCompactParallelismStats", parallelismMetricsSchema, nullable = true), + StructField("deletionVectorStats", dvMetricsSchema, nullable = true) )) val optimizeSchema = StructType(Seq( StructField("path", StringType, nullable = true), @@ -157,7 +163,9 @@ trait OptimizeMetricsSuiteBase extends QueryTest "maxFileSize", "p25FileSize", "p50FileSize", - "p75FileSize").foreach(metric => assert(actualOperationMetrics.get(metric).isDefined)) + "p75FileSize", + "numDeletionVectorsRemoved" + ).foreach(metric => assert(actualOperationMetrics.get(metric).isDefined)) } } } @@ -241,7 +249,8 @@ trait OptimizeMetricsSuiteBase extends QueryTest | "maxFileSize" : "${finalSizes.max}", | "p25FileSize" : "${finalSizes(finalSizes.length / 4)}", | "p50FileSize" : "${finalSizes(finalSizes.length / 2)}", - | "p75FileSize" : "${finalSizes(3 * finalSizes.length / 4)}" + | "p75FileSize" : "${finalSizes(3 * finalSizes.length / 4)}", + | "numDeletionVectorsRemoved" : "0" |}""".stripMargin.trim val expMetrics = JsonUtils.fromJson[Map[String, String]](expMetricsJson) @@ -302,6 +311,58 @@ trait OptimizeMetricsSuiteBase extends QueryTest } } } + + val optimizeCommands = Seq("optimize", "zorder") + for (cmd <- optimizeCommands) { + testWithDVs(s"deletion vector metrics - $cmd") { + withTempDir { dirName => + // Create table with 100 files of 10 rows each. + val numFiles = 100 + val path = dirName.getAbsolutePath + spark.range(0, 1000, step = 1, numPartitions = numFiles) + .write.format("delta").save(path) + val tableName = s"delta.`$path`" + val deltaTable = DeltaTable.forPath(spark, path) + val deltaLog = DeltaLog.forTable(spark, path) + + var allFiles = deltaLog.unsafeVolatileSnapshot.allFiles.collect().toSeq + // Delete some rows to create Deletion Vectors. + val numFilesWithDVs = 20 + val numDeletedRows = numFilesWithDVs * 1 + allFiles.take(numFilesWithDVs).foreach(file => removeRowsFromFile(deltaLog, file, Seq(1))) + + allFiles = deltaLog.unsafeVolatileSnapshot.allFiles.collect().toSeq + assert(allFiles.size === numFiles) + assert(allFiles.filter(_.deletionVector != null).size === numFilesWithDVs) + + val metrics: Seq[OptimizeMetrics] = cmd match { + case "optimize" => + spark.sql(s"OPTIMIZE $tableName") + .select("metrics.*").as[OptimizeMetrics].collect().toSeq + case "zorder" => + spark.sql(s"OPTIMIZE $tableName ZORDER BY (id)") + .select("metrics.*").as[OptimizeMetrics].collect().toSeq + case unknown => throw new IllegalArgumentException(s"Unknown command: $unknown") + } + + // Check DV metrics in the result. + assert(metrics.length === 1) + val dvStats = metrics.head.deletionVectorStats + assert(dvStats.get.numDeletionVectorsRemoved === numFilesWithDVs) + assert(dvStats.get.numDeletionVectorRowsRemoved === numDeletedRows) + + // Check DV metrics in the Delta history. + val opMetrics = deltaTable.history.select("operationMetrics") + .take(1) + .head + .getMap(0) + .asInstanceOf[Map[String, String]] + val dvMetrics = opMetrics.keys.filter(_.contains("DeletionVector")) + assert(dvMetrics === Set("numDeletionVectorsRemoved")) + assert(opMetrics("numDeletionVectorsRemoved") === numFilesWithDVs.toString) + } + } + } } class OptimizeMetricsSuite extends OptimizeMetricsSuiteBase