Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support OPTIMIZE on Delta tables with DVs #1578

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,21 @@ case class AddFile(
/** Returns the total number of records, including those marked as deleted. */
@JsonIgnore
def numPhysicalRecords: Option[Long] = numLogicalRecords.map(_ + numDeletedRecords)

/** Returns the approx size of the remaining records after excluding the deleted ones. */
@JsonIgnore
def estLogicalFileSize: Option[Long] = logicalToPhysicalRecordsRatio.map(n => (n * size).toLong)

/** Returns the ratio of the logical number of records to the total number of records. */
@JsonIgnore
def logicalToPhysicalRecordsRatio: Option[Double] = {
numLogicalRecords.map(numLogicalRecords =>
numLogicalRecords.toDouble / (numLogicalRecords + numDeletedRecords).toDouble)
}

/** Returns the ratio of number of deleted records to the total number of records. */
@JsonIgnore
def deletedToPhysicalRecordsRatio: Option[Double] = logicalToPhysicalRecordsRatio.map(1.0d - _)
}

object AddFile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.parallel.immutable.ParVector
import org.apache.spark.sql.delta.skipping.MultiDimClustering
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaOperations.Operation
import org.apache.spark.sql.delta.actions.{Action, AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, FileAction, RemoveFile}
import org.apache.spark.sql.delta.commands.optimize._
import org.apache.spark.sql.delta.files.SQLMetricsReporting
import org.apache.spark.sql.delta.schema.SchemaUtils
Expand Down Expand Up @@ -171,11 +171,12 @@ class OptimizeExecutor(
require(minFileSize > 0, "minFileSize must be > 0")
require(maxFileSize > 0, "maxFileSize must be > 0")

val candidateFiles = txn.filterFiles(partitionPredicate)
val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
val partitionSchema = txn.metadata.partitionSchema

// select all files in case of multi-dimensional clustering
val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering)
val maxDeletedRowsRatio = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO)
val filesToProcess = pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)
Expand All @@ -188,9 +189,10 @@ class OptimizeExecutor(

val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
val removedDVs = filesToProcess.filter(_.deletionVector != null).map(_.deletionVector).toSeq
if (addedFiles.size > 0) {
val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns)
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles)
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs)
commitAndRetry(txn, operation, updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
val candidateSetOld = candidateFiles.map(_.path).toSet
Expand Down Expand Up @@ -218,6 +220,11 @@ class OptimizeExecutor(
optimizeStats.totalConsideredFiles = candidateFiles.size
optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size
optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism
if (removedDVs.size > 0) {
optimizeStats.deletionVectorStats = Some(DeletionVectorStats(
numDeletionVectorsRemoved = removedDVs.size,
numDeletionVectorRowsRemoved = removedDVs.map(_.cardinality).sum))
}

if (isMultiDimClustering) {
val inputFileStats =
Expand All @@ -236,6 +243,29 @@ class OptimizeExecutor(
}
}

/**
* Helper method to prune the list of selected files based on fileSize and ratio of
* deleted rows according to the deletion vector in [[AddFile]].
*/
private def pruneCandidateFileList(
minFileSize: Long, maxDeletedRowsRatio: Double, files: Seq[AddFile]): Seq[AddFile] = {

// Select all files in case of multi-dimensional clustering
if (isMultiDimClustering) return files

def shouldCompactBecauseOfDeletedRows(file: AddFile): Boolean = {
// Always compact files with DVs but without numRecords stats.
// This may be overly aggressive, but it fixes the problem in the long-term,
// as the compacted files will have stats.
(file.deletionVector != null && file.numPhysicalRecords.isEmpty) ||
file.deletedToPhysicalRecordsRatio.getOrElse(0d) > maxDeletedRowsRatio
}

// Select files that are small or have too many deleted rows
files.filter(
addFile => addFile.size < minFileSize || shouldCompactBecauseOfDeletedRows(addFile))
}

/**
* Utility methods to group files into bins for optimize.
*
Expand Down Expand Up @@ -370,7 +400,8 @@ class OptimizeExecutor(
private def createMetrics(
sparkContext: SparkContext,
addedFiles: Seq[AddFile],
removedFiles: Seq[RemoveFile]): Map[String, SQLMetric] = {
removedFiles: Seq[RemoveFile],
removedDVs: Seq[DeletionVectorDescriptor]): Map[String, SQLMetric] = {

def setAndReturnMetric(description: String, value: Long) = {
val metric = createMetric(sparkContext, description)
Expand All @@ -392,6 +423,25 @@ class OptimizeExecutor(
totalSize
}

val (deletionVectorRowsRemoved, deletionVectorBytesRemoved) =
removedDVs.map(dv => (dv.cardinality, dv.sizeInBytes.toLong))
.reduceLeftOption((dv1, dv2) => (dv1._1 + dv2._1, dv1._2 + dv2._2))
.getOrElse((0L, 0L))

val dvMetrics: Map[String, SQLMetric] = Map(
"numDeletionVectorsRemoved" ->
setAndReturnMetric(
"total number of deletion vectors removed",
removedDVs.size),
"numDeletionVectorRowsRemoved" ->
setAndReturnMetric(
"total number of deletion vector rows removed",
deletionVectorRowsRemoved),
"numDeletionVectorBytesRemoved" ->
setAndReturnMetric(
"total number of bytes of removed deletion vectors",
deletionVectorBytesRemoved))

val sizeStats = FileSizeStatsWithHistogram.create(addedFiles.map(_.size).sorted)
Map[String, SQLMetric](
"minFileSize" -> setAndReturnMetric("minimum file size", sizeStats.get.min),
Expand All @@ -403,6 +453,7 @@ class OptimizeExecutor(
"numRemovedFiles" -> setAndReturnMetric("total number of files removed.", removedFiles.size),
"numAddedBytes" -> setAndReturnMetric("total number of bytes added", totalSize(addedFiles)),
"numRemovedBytes" ->
setAndReturnMetric("total number of bytes removed", totalSize(removedFiles)))
setAndReturnMetric("total number of bytes removed", totalSize(removedFiles))
) ++ dvMetrics
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ case class OptimizeStats(
var endTimeMs: Long = 0,
var totalClusterParallelism: Long = 0,
var totalScheduledTasks: Long = 0,
var deletionVectorStats: Option[DeletionVectorStats] = None,
var autoCompactParallelismStats: AutoCompactParallelismStats = AutoCompactParallelismStats()) {

def toOptimizeMetrics: OptimizeMetrics = {
Expand All @@ -58,6 +59,7 @@ case class OptimizeStats(
endTimeMs = endTimeMs,
totalClusterParallelism = totalClusterParallelism,
totalScheduledTasks = totalScheduledTasks,
deletionVectorStats = deletionVectorStats,
autoCompactParallelismStats = autoCompactParallelismStats.toMetrics)
}
}
Expand Down Expand Up @@ -224,7 +226,8 @@ case class OptimizeMetrics(
endTimeMs: Long = 0,
totalClusterParallelism: Long = 0,
totalScheduledTasks: Long = 0,
autoCompactParallelismStats: Option[ParallelismMetrics] = None
autoCompactParallelismStats: Option[ParallelismMetrics] = None,
deletionVectorStats: Option[DeletionVectorStats] = None
)

/**
Expand Down Expand Up @@ -253,3 +256,12 @@ case class ParallelismMetrics(
minClusterActiveParallelism: Option[Long] = None,
maxSessionActiveParallelism: Option[Long] = None,
minSessionActiveParallelism: Option[Long] = None)

/**
* Accumulator for statistics related with Deletion Vectors.
* Note that this case class contains mutable variables and cannot be used in places where immutable
* case classes can be used (e.g. map/set keys).
*/
case class DeletionVectorStats(
var numDeletionVectorsRemoved: Long = 0,
var numDeletionVectorRowsRemoved: Long = 0)
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,16 @@ trait DeltaSQLConfBase {
|""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO =
buildConf("optimize.maxDeletedRowsRatio")
.internal()
.doc("Files with a ratio of deleted rows to the total rows larger than this threshold " +
"will be rewritten by the OPTIMIZE command.")
.doubleConf
.checkValue(_ >= 0, "maxDeletedRowsRatio must be in range [0.0, 1.0]")
.checkValue(_ <= 1, "maxDeletedRowsRatio must be in range [0.0, 1.0]")
.createWithDefault(0.05d)
}

object DeltaSQLConf extends DeltaSQLConfBase
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
import java.io.File
import java.util.UUID

import org.apache.spark.sql.delta.DeltaOperations.Truncate
import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, RemoveFile}
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand All @@ -32,6 +33,14 @@ import org.apache.spark.sql.test.SharedSparkSession
/** Collection of test utilities related with persistent Deletion Vectors. */
trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession {

def testWithDVs(testName: String, testTags: org.scalatest.Tag*)(thunk: => Unit): Unit = {
test(testName, testTags : _*) {
withDeletionVectorsEnabled() {
thunk
}
}
}

/** Run a thunk with Deletion Vectors enabled/disabled. */
def withDeletionVectorsEnabled(enabled: Boolean = true)(thunk: => Unit): Unit = {
val enabledStr = enabled.toString
Expand Down Expand Up @@ -92,6 +101,31 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession {
}

// ======== HELPER METHODS TO WRITE DVs ==========
/** Helper method to remove the specified rows in the given file using DVs */
protected def removeRowsFromFileUsingDV(
log: DeltaLog,
addFile: AddFile,
rowIds: Seq[Long]): Seq[Action] = {
val dv = RoaringBitmapArray(rowIds: _*)
writeFileWithDV(log, addFile, dv)
}

/** Utility method to remove a ratio of rows from the given file */
protected def deleteRows(
log: DeltaLog, file: AddFile, approxPhyRows: Long, ratioOfRowsToDelete: Double): Unit = {
val numRowsToDelete =
Math.ceil(ratioOfRowsToDelete * file.numPhysicalRecords.getOrElse(approxPhyRows)).toInt
removeRowsFromFile(log, file, Seq.range(0, numRowsToDelete))
}

/** Utility method to remove the given rows from the given file using DVs */
protected def removeRowsFromFile(
log: DeltaLog, addFile: AddFile, rowIndexesToRemove: Seq[Long]): Unit = {
val txn = log.startTransaction()
val actions = removeRowsFromFileUsingDV(log, addFile, rowIndexesToRemove)
txn.commit(actions, Truncate())
}

protected def serializeRoaringBitmapArrayWithDefaultFormat(
dv: RoaringBitmapArray): Array[Byte] = {
val serializationFormat = RoaringBitmapArrayFormat.Portable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.collection.JavaConverters._

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
Expand All @@ -40,6 +42,7 @@ import org.apache.spark.sql.test.SharedSparkSession
*/
trait OptimizeCompactionSuiteBase extends QueryTest
with SharedSparkSession
with DeletionVectorsTestUtils
with DeltaColumnMappingTestUtils {

import testImplicits._
Expand Down Expand Up @@ -153,6 +156,78 @@ trait OptimizeCompactionSuiteBase extends QueryTest
}
}

for (statsCollectionEnabled <- BOOLEAN_DOMAIN)
test(
s"optimize command with DVs when statsCollectionEnabled=$statsCollectionEnabled") {
withTempDir { tempDir =>
val path = tempDir.getAbsolutePath
withSQLConf(
DeltaSQLConf.DELTA_COLLECT_STATS.key -> statsCollectionEnabled.toString,
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "true") {
// Create 10 files each with 1000 records
spark.range(start = 0, end = 10000, step = 1, numPartitions = 10)
.toDF("id")
.withColumn(colName = "extra", lit("just a random text to fill up the space....."))
.write.format("delta").mode("append").save(path) // v0

val deltaLog = DeltaLog.forTable(spark, path)
val filesV0 = deltaLog.unsafeVolatileSnapshot.allFiles.collect()
assert(filesV0.size == 10)

// Default `optimize.maxDeletedRowsRatio` is 0.05.
// Delete slightly more than threshold ration in two files, less in one of the file
val file0 = filesV0(1)
val file1 = filesV0(4)
val file2 = filesV0(8)
deleteRows(deltaLog, file0, approxPhyRows = 1000, ratioOfRowsToDelete = 0.06d) // v1
deleteRows(deltaLog, file1, approxPhyRows = 1000, ratioOfRowsToDelete = 0.06d) // v2
deleteRows(deltaLog, file2, approxPhyRows = 1000, ratioOfRowsToDelete = 0.01d) // v3

// Add a one small file, so that the file selection is based on both the file size and
// deleted rows ratio
spark.range(start = 1, end = 2, step = 1, numPartitions = 1)
.toDF("id").withColumn(colName = "extra", lit(""))
.write.format("delta").mode("append").save(path) // v4
val smallFiles = addedFiles(deltaLog.getChanges(startVersion = 4).next()._2)
assert(smallFiles.size == 1)

// Save the data before optimize for comparing it later with optimize
val data = spark.read.format("delta").load(path)

// Set a low value for minFileSize so that the criteria for file selection is based on DVs
// and not based on the file size.
val targetSmallSize = smallFiles(0).size + 10 // A number just higher than the `smallFile`
withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key -> targetSmallSize.toString) {
executeOptimizePath(path) // v5
}
val changes = deltaLog.getChanges(startVersion = 5).next()._2

// When the stats are enabled, we expect the two files containing more than the
// threshold rows to be compacted. When stats are disabled, we expect all files with DVs
// compacted
var expectedRemoveFiles = Set(file0.path, file1.path)
if (!statsCollectionEnabled) expectedRemoveFiles += file2.path
// Expect the small file also to be compacted always
expectedRemoveFiles += smallFiles(0).path

assert(removedFiles(changes).map(_.path).toSet === expectedRemoveFiles)

assert(addedFiles(changes).size == 1) // Expect one new file added

// Verify the final data after optimization hasn't changed.
checkAnswer(spark.read.format("delta").load(path), data)
}
}
}

private def removedFiles(actions: Seq[Action]): Seq[RemoveFile] = {
actions.filter(_.isInstanceOf[RemoveFile]).map(_.asInstanceOf[RemoveFile])
}

private def addedFiles(actions: Seq[Action]): Seq[AddFile] = {
actions.filter(_.isInstanceOf[AddFile]).map(_.asInstanceOf[AddFile])
}

def appendRowsToDeltaTable(
path: String,
numFiles: Int,
Expand Down
Loading