From 52c221ab697e87c5bb2708e0f3748c57592e70c7 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Thu, 26 Jan 2023 10:34:11 -0800 Subject: [PATCH] Support limit pushdown on Delta tables with DVs GitOrigin-RevId: 6ef96d9f59ce3915020856d67a35422f38c5ae85 --- .../sql/delta/actions/InMemoryLogReplay.scala | 48 +++-- .../spark/sql/delta/actions/actions.scala | 48 +++++ .../sql/delta/stats/DataSkippingReader.scala | 30 ++- .../spark/sql/delta/stats/DeltaScan.scala | 7 +- .../sql/delta/DeletionVectorsTestUtils.scala | 202 ++++++++++++++++++ .../sql/delta/DeltaLimitPushDownSuite.scala | 63 ++++++ 6 files changed, 372 insertions(+), 26 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala index 1af9da4d44d..421f2a88fd9 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala @@ -19,16 +19,18 @@ package org.apache.spark.sql.delta.actions import java.net.URI /** - * Replays a history of action, resolving them to produce the current state + * Replays a history of actions, resolving them to produce the current state * of the table. The protocol for resolution is as follows: - * - The most recent [[AddFile]] and accompanying metadata for any `path` wins. + * - The most recent [[AddFile]] and accompanying metadata for any `(path, dv id)` tuple wins. * - [[RemoveFile]] deletes a corresponding [[AddFile]] and is retained as a * tombstone until `minFileRetentionTimestamp` has passed. + * A [[RemoveFile]] "corresponds" to the [[AddFile]] that matches both the parquet file URI + * *and* the deletion vector's URI (if any). * - The most recent version for any `appId` in a [[SetTransaction]] wins. * - The most recent [[Metadata]] wins. * - The most recent [[Protocol]] version wins. - * - For each path, this class should always output only one [[FileAction]] (either [[AddFile]] or - * [[RemoveFile]]) + * - For each `(path, dv id)` tuple, this class should always output only one [[FileAction]] + * (either [[AddFile]] or [[RemoveFile]]) * * This class is not thread safe. */ @@ -36,12 +38,14 @@ class InMemoryLogReplay( minFileRetentionTimestamp: Long, minSetTransactionRetentionTimestamp: Option[Long]) extends LogReplay { - var currentProtocolVersion: Protocol = null - var currentVersion: Long = -1 - var currentMetaData: Metadata = null - val transactions = new scala.collection.mutable.HashMap[String, SetTransaction]() - val activeFiles = new scala.collection.mutable.HashMap[URI, AddFile]() - private val tombstones = new scala.collection.mutable.HashMap[URI, RemoveFile]() + import InMemoryLogReplay._ + + private var currentProtocolVersion: Protocol = null + private var currentVersion: Long = -1 + private var currentMetaData: Metadata = null + private val transactions = new scala.collection.mutable.HashMap[String, SetTransaction]() + private val activeFiles = new scala.collection.mutable.HashMap[UniqueFileActionTuple, AddFile]() + private val tombstones = new scala.collection.mutable.HashMap[UniqueFileActionTuple, RemoveFile]() override def append(version: Long, actions: Iterator[Action]): Unit = { assert(currentVersion == -1 || version == currentVersion + 1, @@ -55,14 +59,16 @@ class InMemoryLogReplay( case a: Protocol => currentProtocolVersion = a case add: AddFile => - activeFiles(add.pathAsUri) = add.copy(dataChange = false) + val uniquePath = UniqueFileActionTuple(add.pathAsUri, add.getDeletionVectorUniqueId) + activeFiles(uniquePath) = add.copy(dataChange = false) // Remove the tombstone to make sure we only output one `FileAction`. - tombstones.remove(add.pathAsUri) + tombstones.remove(uniquePath) case remove: RemoveFile => - activeFiles.remove(remove.pathAsUri) - tombstones(remove.pathAsUri) = remove.copy(dataChange = false) - case ci: CommitInfo => // do nothing - case cdc: AddCDCFile => // do nothing + val uniquePath = UniqueFileActionTuple(remove.pathAsUri, remove.getDeletionVectorUniqueId) + activeFiles.remove(uniquePath) + tombstones(uniquePath) = remove.copy(dataChange = false) + case _: CommitInfo => // do nothing + case _: AddCDCFile => // do nothing case null => // Some crazy future feature. Ignore } } @@ -71,7 +77,7 @@ class InMemoryLogReplay( tombstones.values.filter(_.delTimestamp > minFileRetentionTimestamp) } - private def getTransactions: Iterable[SetTransaction] = { + private[delta] def getTransactions: Iterable[SetTransaction] = { if (minSetTransactionRetentionTimestamp.isEmpty) { transactions.values } else { @@ -88,4 +94,12 @@ class InMemoryLogReplay( getTransactions ++ (activeFiles.values ++ getTombstones).toSeq.sortBy(_.path).iterator } + + /** Returns all [[AddFile]] actions after the Log Replay */ + private[delta] def allFiles: Seq[AddFile] = activeFiles.values.toSeq +} + +object InMemoryLogReplay{ + /** The unit of path uniqueness in delta log actions is the tuple `(parquet file, dv)`. */ + final case class UniqueFileActionTuple(fileURI: URI, deletionVectorURI: Option[String]) } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 93e4841d77b..b790c570f38 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -412,6 +412,30 @@ case class AddFile( removedFile } + /** + * Logically remove rows by associating a `deletionVector` with the file. + * @param deletionVector: The descriptor of the DV that marks rows as deleted. + * @param dataChange: When false, the actions are marked as no-data-change actions. + */ + def removeRows( + deletionVector: DeletionVectorDescriptor, + dataChange: Boolean = true): (AddFile, RemoveFile) = { + val withUpdatedDV = this.copy(deletionVector = deletionVector, dataChange = dataChange) + val addFile = withUpdatedDV + val removeFile = this.removeWithTimestamp(dataChange = dataChange) + (addFile, removeFile) + } + + /** + * Return the unique id of the deletion vector, if present, or `None` if there's no DV. + * + * The unique id differentiates DVs, even if there are multiple in the same file + * or the DV is stored inline. + */ + @JsonIgnore + def getDeletionVectorUniqueId: Option[String] = Option(deletionVector).map(_.uniqueId) + + @JsonIgnore lazy val insertionTime: Long = tag(AddFile.Tags.INSERTION_TIME).map(_.toLong) // From modification time in milliseconds to microseconds. @@ -448,6 +472,7 @@ case class AddFile( val numLogicalRecords = if (node.has("numRecords")) { Some(node.get("numRecords")).filterNot(_.isNull).map(_.asLong()) + .map(_ - numDeletedRecords) } else None Some(ParsedStatsFields( @@ -461,6 +486,13 @@ case class AddFile( override lazy val numLogicalRecords: Option[Long] = parsedStatsFields.flatMap(_.numLogicalRecords) + /** Returns the number of records marked as deleted. */ + @JsonIgnore + def numDeletedRecords: Long = if (deletionVector != null) deletionVector.cardinality else 0L + + /** Returns the total number of records, including those marked as deleted. */ + @JsonIgnore + def numPhysicalRecords: Option[Long] = numLogicalRecords.map(_ + numDeletedRecords) } object AddFile { @@ -539,6 +571,22 @@ case class RemoveFile( @JsonIgnore var numLogicalRecords: Option[Long] = None + /** + * Return the unique id of the deletion vector, if present, or `None` if there's no DV. + * + * The unique id differentiates DVs, even if there are multiple in the same file + * or the DV is stored inline. + */ + @JsonIgnore + def getDeletionVectorUniqueId: Option[String] = Option(deletionVector).map(_.uniqueId) + + /** Returns the number of records marked as deleted. */ + @JsonIgnore + def numDeletedRecords: Long = if (deletionVector != null) deletionVector.cardinality else 0L + + /** Returns the total number of records, including those marked as deleted. */ + @JsonIgnore + def numPhysicalRecords: Option[Long] = numLogicalRecords.map(_ + numDeletedRecords) /** * Create a copy with the new tag. `extendedFileMetadata` is copied unchanged. diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala index ddd98833ba0..28a125ba88e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala @@ -176,6 +176,7 @@ private[delta] object DataSkippingReader { val sizeCollectorInputEncoders: Seq[Option[ExpressionEncoder[_]]] = Seq( Option(ExpressionEncoder[Boolean]()), Option(ExpressionEncoder[java.lang.Long]()), + Option(ExpressionEncoder[java.lang.Long]()), Option(ExpressionEncoder[java.lang.Long]())) } @@ -444,6 +445,8 @@ trait DataSkippingReaderBase constructDataFilters(And(Not(e1), Not(e2))) // Match any file whose null count is larger than zero. + // Note DVs might result in a redundant read of a file. + // However, they cannot lead to a correctness issue. case IsNull(SkippingEligibleColumn(a, _)) => statsProvider.getPredicateWithStatType(a, NULL_COUNT) { nullCount => nullCount > Literal(0L) @@ -452,6 +455,7 @@ trait DataSkippingReaderBase constructDataFilters(IsNotNull(e)) // Match any file whose null count is less than the row count. + // Note When comparing numRecords to nullCount we should NOT take into account DV cardinality case IsNotNull(SkippingEligibleColumn(a, _)) => val nullCountCol = StatsColumn(NULL_COUNT, a) val numRecordsCol = StatsColumn(NUM_RECORDS) @@ -677,6 +681,9 @@ trait DataSkippingReaderBase // caller will negate the expression we return. In case a stats column is NULL, `NOT(expr)` // must return `TRUE`, and without these NULL checks it would instead return // `NOT(NULL)` => `NULL`. + // NOTE: Here we only verify the existence of statistics. Therefore, DVs do not + // cause any issue. Furthermore, the check below NUM_RECORDS === NULL_COUNT should NOT + // take into the DV cardinality. referencedStats.flatMap { stat => stat match { case StatsColumn(MIN, _) | StatsColumn(MAX, _) => Seq(stat, StatsColumn(NULL_COUNT, stat.pathToColumn), StatsColumn(NUM_RECORDS)) @@ -704,10 +711,10 @@ trait DataSkippingReaderBase private def buildSizeCollectorFilter(): (ArrayAccumulator, Column => Column) = { val bytesCompressed = col("size") val rows = getStatsColumnOrNullLiteral(NUM_RECORDS) + val dvCardinality = coalesce(col("deletionVector.cardinality"), lit(0L)) + val logicalRows = rows - dvCardinality as "logicalRows" - val accumulator = new ArrayAccumulator( - 3 - ) + val accumulator = new ArrayAccumulator(4) spark.sparkContext.register(accumulator) @@ -715,11 +722,14 @@ trait DataSkippingReaderBase // `sizeCollectorInputEncoders` value. val collector = (include: Boolean, bytesCompressed: java.lang.Long, + logicalRows: java.lang.Long, rows: java.lang.Long) => { if (include) { accumulator.add((0, bytesCompressed)) /* count bytes of AddFiles */ accumulator.add((1, Option(rows).map(_.toLong).getOrElse(-1L))) /* count rows in AddFiles */ accumulator.add((2, 1)) /* count number of AddFiles */ + accumulator.add((3, Option(logicalRows) + .map(_.toLong).getOrElse(-1L))) /* count logical rows in AddFiles */ } include } @@ -729,7 +739,7 @@ trait DataSkippingReaderBase inputEncoders = sizeCollectorInputEncoders, deterministic = false) - (accumulator, collectorUdf(_: Column, bytesCompressed, rows)) + (accumulator, collectorUdf(_: Column, bytesCompressed, logicalRows, rows)) } override def filesWithStatsForScan(partitionFilters: Seq[Expression]): DataFrame = { @@ -841,6 +851,9 @@ trait DataSkippingReaderBase /** * Gathers files that should be included in a scan based on the given predicates. * Statistics about the amount of data that will be read are gathered and returned. + * Note, the statistics column that is added when keepNumRecords = true should NOT + * take into account DVs. Consumers of this method might commit the file. The semantics + * of the statistics need to be consistent across all files. */ override def filesForScan(filters: Seq[Expression], keepNumRecords: Boolean): DeltaScan = { val startTime = System.currentTimeMillis() @@ -994,13 +1007,15 @@ trait DataSkippingReaderBase val totalDataSize = new DataSize( sizeInBytesIfKnown, None, - numOfFilesIfKnown + numOfFilesIfKnown, + None ) val scannedDataSize = new DataSize( scan.byteSize, scan.numPhysicalRecords, - Some(scan.files.size) + Some(scan.files.size), + scan.numLogicalRecords ) DeltaScan( @@ -1039,7 +1054,8 @@ trait DataSkippingReaderBase "Delta", "DataSkippingReaderEdge.getFilesAndNumRecords") { import org.apache.spark.sql.delta.implicits._ - val numLogicalRecords = col("stats.numRecords") + val dvCardinality = coalesce(col("deletionVector.cardinality"), lit(0L)) + val numLogicalRecords = col("stats.numRecords") - dvCardinality val result = df.withColumn("numPhysicalRecords", col("stats.numRecords")) // Physical .withColumn("numLogicalRecords", numLogicalRecords) // Logical diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala index 0bae3749440..082c3ef6257 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala @@ -39,7 +39,9 @@ case class DataSize( @JsonDeserialize(contentAs = classOf[java.lang.Long]) rows: Option[Long] = None, @JsonDeserialize(contentAs = classOf[java.lang.Long]) - files: Option[Long] = None + files: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + logicalRows: Option[Long] = None ) object DataSize { @@ -47,7 +49,8 @@ object DataSize { DataSize( Option(a.value(0)).filterNot(_ == -1), Option(a.value(1)).filterNot(_ == -1), - Option(a.value(2)).filterNot(_ == -1) + Option(a.value(2)).filterNot(_ == -1), + Option(a.value(3)).filterNot(_ == -1) ) } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala new file mode 100644 index 00000000000..c7d9a44356f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala @@ -0,0 +1,202 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.io.File +import java.util.UUID + +import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, RemoveFile} +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore +import org.apache.spark.sql.delta.util.PathWithFileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.test.SharedSparkSession + +/** Collection of test utilities related with persistent Deletion Vectors. */ +trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession { + + /** Run a thunk with Deletion Vectors enabled/disabled. */ + def withDeletionVectorsEnabled(enabled: Boolean = true)(thunk: => Unit): Unit = { + val enabledStr = enabled.toString + withSQLConf( + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> enabledStr + ) { + thunk + } + } + + /** Helper to run 'fn' with a temporary Delta table. */ + def withTempDeltaTable( + dataDF: DataFrame, + partitionBy: Seq[String] = Seq.empty, + enableDVs: Boolean = true)(fn: (io.delta.tables.DeltaTable, DeltaLog) => Unit): Unit = { + withTempPath { path => + val tablePath = new Path(path.getAbsolutePath) + dataDF.write + .option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, enableDVs.toString) + .partitionBy(partitionBy: _*) + .format("delta") + .save(tablePath.toString) + val targetTable = io.delta.tables.DeltaTable.forPath(tablePath.toString) + val targetLog = DeltaLog.forTable(spark, tablePath) + fn(targetTable, targetLog) + } + } + + /** Helper that verifies whether a defined number of DVs exist */ + def verifyDVsExist(targetLog: DeltaLog, filesWithDVsSize: Int): Unit = { + val filesWithDVs = getFilesWithDeletionVectors(targetLog) + assert(filesWithDVs.size === filesWithDVsSize) + assertDeletionVectorsExist(targetLog, filesWithDVs) + } + + /** Returns all [[AddFile]] actions of a Delta table that contain Deletion Vectors. */ + def getFilesWithDeletionVectors(log: DeltaLog): Seq[AddFile] = + log.unsafeVolatileSnapshot.allFiles.collect().filter(_.deletionVector != null).toSeq + + /** Helper to check that the Deletion Vectors of the provided file actions exist on disk. */ + def assertDeletionVectorsExist(log: DeltaLog, filesWithDVs: Seq[AddFile]): Unit = { + val tablePath = new Path(log.dataPath.toUri.getPath) + for (file <- filesWithDVs) { + val dv = file.deletionVector + assert(dv != null) + assert(dv.isOnDisk && !dv.isInline) + assert(dv.offset.isDefined) + + // Check that DV exists. + val dvPath = dv.absolutePath(tablePath) + val dvPathStr = DeletionVectorStore.pathToString(dvPath) + assert(new File(dvPathStr).exists(), s"DV not found $dvPath") + + // Check that cardinality is correct. + val bitmap = newDVStore.read(dvPath, dv.offset.get, dv.sizeInBytes) + assert(dv.cardinality === bitmap.cardinality) + } + } + + // ======== HELPER METHODS TO WRITE DVs ========== + protected def serializeRoaringBitmapArrayWithDefaultFormat( + dv: RoaringBitmapArray): Array[Byte] = { + val serializationFormat = RoaringBitmapArrayFormat.Portable + dv.serializeAsByteArray(serializationFormat) + } + + /** + * Produce a new [[AddFile]] that will store `dv` in the log using default settings for choosing + * inline or on-disk storage. + * + * Also returns the corresponding [[RemoveFile]] action for `currentFile`. + * + * TODO: Always on-disk for now. Inline support comes later. + */ + protected def writeFileWithDV( + log: DeltaLog, + currentFile: AddFile, + dv: RoaringBitmapArray): Seq[Action] = { + writeFileWithDVOnDisk(log, currentFile, dv) + } + + /** + * Produce a new [[AddFile]] that will reference the `dv` in the log while storing it on-disk. + * + * Also returns the corresponding [[RemoveFile]] action for `currentFile`. + */ + protected def writeFileWithDVOnDisk( + log: DeltaLog, + currentFile: AddFile, + dv: RoaringBitmapArray): Seq[Action] = writeFilesWithDVsOnDisk(log, Seq((currentFile, dv))) + + protected def withDVWriter[T]( + log: DeltaLog, + dvFileID: UUID)(fn: DeletionVectorStore.Writer => T): T = { + val dvStore = newDVStore + // scalastyle:off deltahadoopconfiguration + val conf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + val tableWithFS = PathWithFileSystem.withConf(log.dataPath, conf) + val dvPath = + DeletionVectorStore.assembleDeletionVectorPathWithFileSystem(tableWithFS, dvFileID) + val writer = dvStore.createWriter(dvPath) + try { + fn(writer) + } finally { + writer.close() + } + } + + /** + * Produce new [[AddFile]] actions that will reference associated DVs in the log while storing + * all DVs in the same file on-disk. + * + * Also returns the corresponding [[RemoveFile]] actions for the original file entries. + */ + protected def writeFilesWithDVsOnDisk( + log: DeltaLog, + filesWithDVs: Seq[(AddFile, RoaringBitmapArray)]): Seq[Action] = { + val dvFileId = UUID.randomUUID() + withDVWriter(log, dvFileId) { writer => + filesWithDVs.flatMap { case (currentFile, dv) => + val range = writer.write(serializeRoaringBitmapArrayWithDefaultFormat(dv)) + val dvData = DeletionVectorDescriptor.onDiskWithRelativePath( + id = dvFileId, + sizeInBytes = range.length, + cardinality = dv.cardinality, + offset = Some(range.offset)) + val (add, remove) = currentFile.removeRows( + dvData + ) + Seq(add, remove) + } + } + } + + /** + * Removes the `numRowsToRemovePerFile` from each file via DV. + * Returns the total number of rows removed. + */ + protected def removeRowsFromAllFilesInLog( + log: DeltaLog, + numRowsToRemovePerFile: Long): Long = { + var numFiles: Option[Int] = None + // This is needed to make the manual commit work correctly, since we are not actually + // running a command that produces metrics. + withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "false") { + val txn = log.startTransaction() + val allAddFiles = txn.snapshot.allFiles.collect() + numFiles = Some(allAddFiles.length) + val bitmap = RoaringBitmapArray(0L until numRowsToRemovePerFile: _*) + val actions = allAddFiles.flatMap { file => + if (file.numPhysicalRecords.isDefined) { + // Only when stats are enabled. Can't check when stats are disabled + assert(file.numPhysicalRecords.get > numRowsToRemovePerFile) + } + writeFileWithDV(log, file, bitmap) + } + txn.commit(actions, DeltaOperations.Delete(predicate = Seq.empty)) + } + numFiles.get * numRowsToRemovePerFile + } + + def newDVStore(): DeletionVectorStore = { + // scalastyle:off deltahadoopconfiguration + DeletionVectorStore.createInstance(spark.sessionState.newHadoopConf()) + // scalastyle:on deltahadoopconfiguration + } +} diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaLimitPushDownSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaLimitPushDownSuite.scala index a7d1f45f01f..d15487a5cbd 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaLimitPushDownSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaLimitPushDownSuite.scala @@ -34,6 +34,7 @@ trait DeltaLimitPushDownTests extends QueryTest with SharedSparkSession with DatabricksLogging with ScanReportHelper + with DeletionVectorsTestUtils with StatsUtils with DeltaSQLCommandTest { @@ -233,6 +234,68 @@ trait DeltaLimitPushDownTests extends QueryTest } } + private def withDVSettings(thunk: => Unit): Unit = { + withSQLConf( + DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> "false" + ) { + withDeletionVectorsEnabled() { + thunk + } + } + } + + for (statsCollectionEnabled <- BOOLEAN_DOMAIN) { + test(s"Verify limit correctness in the presence of DVs " + + s"statsCollectionEnabled: $statsCollectionEnabled") { + withDVSettings { + withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> statsCollectionEnabled.toString) { + val targetDF = spark.range(start = 0, end = 100, step = 1, numPartitions = 2) + .withColumn("value", col("id")) + + withTempDeltaTable(targetDF) { (targetTable, targetLog) => + removeRowsFromAllFilesInLog(targetLog, numRowsToRemovePerFile = 10) + verifyDVsExist(targetLog, 2) + + val targetDF = targetTable.toDF + + // We have 2 files 50 rows each. We deleted 10 rows from the first file. The first file + // now contains 50 physical rows and 40 logical. Failing to take into account the DVs in + // the first file results into prematurely terminating the scan and returning an + // incorrect result. Note, the corner case in terms of correctness is when the limit is + // set to 50. When statistics collection is disabled, we read both files. + val limitToExpectedNumberOfFilesReadSeq = Range(10, 90, 10) + .map(n => (n, if (n < 50 && statsCollectionEnabled) 1 else 2)) + + for ((limit, expectedNumberOfFilesRead) <- limitToExpectedNumberOfFilesReadSeq) { + val df = targetDF.limit(limit) + + // Assess correctness. + assert(df.count === limit) + + val scanStats = getStats(df) + + // Check we do not read more files than needed. + assert(scanStats.scanned.files === Some(expectedNumberOfFilesRead)) + + // Verify physical and logical rows are updated correctly. + val numDeletedRows = 10 + val numPhysicalRowsPerFile = 50 + val numTotalPhysicalRows = numPhysicalRowsPerFile * expectedNumberOfFilesRead + val numTotalLogicalRows = numTotalPhysicalRows - + (numDeletedRows * expectedNumberOfFilesRead) + val expectedNumTotalPhysicalRows = + if (statsCollectionEnabled) Some(numTotalPhysicalRows) else None + val expectedNumTotalLogicalRows = + if (statsCollectionEnabled) Some(numTotalLogicalRows) else None + + assert(scanStats.scanned.rows === expectedNumTotalPhysicalRows) + assert(scanStats.scanned.logicalRows === expectedNumTotalLogicalRows) + } + } + } + } + } + } } class DeltaLimitPushDownV1Suite extends DeltaLimitPushDownTests