Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xupefei committed May 7, 2023
1 parent e821d72 commit 2f159c0
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 48 deletions.
7 changes: 4 additions & 3 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
* Creates a [[ReorgTable]] logical plan.
* Examples:
* {{{
* -- Physically delete dropped columns of target table
* -- Physically delete dropped rows and columns of target table
* REORG TABLE (delta.`/path/to/table` | delta_table_name)
* [WHERE partition_predicate] APPLY (PURGE)
* }}}
Expand All @@ -364,8 +364,9 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
throw new ParseException("REORG command requires a file path or table name.", ctx)
}

val targetIdentifier = visitMultipartIdentifier(ctx.table)
val targetTable = createUnresolvedTable(targetIdentifier, "REORG")
val targetIdentifier = visitTableIdentifier(ctx.table)
val tableNameParts = targetIdentifier.database.toSeq :+ targetIdentifier.table
val targetTable = createUnresolvedTable(tableNameParts, "REORG")

ReorgTable(targetTable)(Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ case class OptimizeTableCommand(
tableId: Option[TableIdentifier],
userPartitionPredicates: Seq[String],
options: Map[String, String],
isPurge: Boolean = false)(val zOrderBy: Seq[UnresolvedAttribute])
optimizeContext: OptimizeContext = OptimizeContext())(val zOrderBy: Seq[UnresolvedAttribute])
extends OptimizeTableCommandBase with LeafRunnableCommand {

override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil
Expand Down Expand Up @@ -139,11 +139,37 @@ case class OptimizeTableCommand(
validateZorderByColumns(sparkSession, txn, zOrderBy)
val zOrderByColumns = zOrderBy.map(_.name).toSeq

new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns, isPurge)
new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns, optimizeContext)
.optimize()
}
}

/**
* Stored all runtime context information that can control the execution of optimize.
*
* @param isPurge Whether the rewriting task is only for purging soft-deleted data instead of
* for compaction. If [[isPurge]] is true, only files with DVs will be selected
* for compaction.
* @param minFileSize Files which are smaller than this threshold will be selected for compaction.
* If not specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE]] will be used.
* This parameter must be set to `0` when [[isPurge]] is true.
* @param maxDeletedRowsRatio Files with a ratio of soft-deleted rows to the total rows larger than
* this threshold will be rewritten by the OPTIMIZE command. If not
* specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]]
* will be used. This parameter must be set to `0` when [[isPurge]] is
* true.
*/
case class OptimizeContext(
isPurge: Boolean = false,
minFileSize: Option[Long] = None,
maxDeletedRowsRatio: Option[Double] = None) {
if (isPurge) {
require(
minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d),
"minFileSize and maxDeletedRowsRatio must be 0 when running PURGE.")
}
}

/**
* Optimize job which compacts small files into larger files to reduce
* the number of files and potentially allow more efficient reads.
Expand All @@ -157,7 +183,7 @@ class OptimizeExecutor(
txn: OptimisticTransaction,
partitionPredicate: Seq[Expression],
zOrderByColumns: Seq[String],
isPurge: Boolean = false)
optimizeContext: OptimizeContext)
extends DeltaCommand with SQLMetricsReporting with Serializable {

/** Timestamp to use in [[FileAction]] */
Expand All @@ -167,19 +193,13 @@ class OptimizeExecutor(

def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
val maxFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
require(maxFileSize > 0, "maxFileSize must be > 0")
val (minFileSize, maxDeletedRowsRatio) = if (isPurge) {
(0L, 0d) // Only selects files with DV
} else {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
val maxDeletedRowsRatio = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO)
require(minFileSize > 0, "minFileSize must be > 0")
(minFileSize, maxDeletedRowsRatio)
}
val minFileSize = optimizeContext.minFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
val maxFileSize =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
val maxDeletedRowsRatio = optimizeContext.maxDeletedRowsRatio.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO))

val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
val partitionSchema = txn.metadata.partitionSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ case class ReorgTable(target: LogicalPlan)(val predicates: Seq[String]) extends
override val otherCopyArgs: Seq[AnyRef] = predicates :: Nil
}

/**
* The PURGE command.
*/
case class ReorgTableCommand(target: DeltaTableV2)(val predicates: Seq[String])
extends OptimizeTableCommandBase with LeafCommand with IgnoreCachedData {

Expand All @@ -42,7 +45,11 @@ case class ReorgTableCommand(target: DeltaTableV2)(val predicates: Seq[String])
target.catalogTable.map(_.identifier),
predicates,
options = Map.empty,
isPurge = true)(zOrderBy = Nil)
optimizeContext = OptimizeContext(
isPurge = true,
minFileSize = Some(0L),
maxDeletedRowsRatio = Some(0d))
)(zOrderBy = Nil)
command.run(sparkSession)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.functions.col

class DeltaPurgeSuite extends QueryTest
with SharedSparkSession
Expand All @@ -34,35 +36,30 @@ class DeltaPurgeSuite extends QueryTest
}
}

testWithDVs("Purge DVs will combine small files") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
val log = DeltaLog.forTable(spark, path)
spark
.range(0, 100, 1, numPartitions = 5)
.write
.format("delta")
.save(path)
test("Purge DVs will combine small files") {
val targetDf = spark.range(0, 100, 1, numPartitions = 5).toDF
withTempDeltaTable(targetDf) { (_, log) =>
val path = log.dataPath.toString

sql(s"DELETE FROM delta.`$path` WHERE id IN (0, 99)")
assert(log.update().allFiles.filter(_.deletionVector != null).count() === 2)
executePurge(path)
withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "1073741824") { // 1gb
executePurge(path)
}
val (addFiles, _) = getFileActionsInLastVersion(log)
assert(addFiles.size === 1, "files should be combined")
assert(addFiles.forall(_.deletionVector === null))
checkAnswer(
sql(s"SELECT * FROM delta.`$path`"),
(1 to 98).toDF())
}
}

testWithDVs("Purge DVs") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
val log = DeltaLog.forTable(spark, path)
spark
.range(0, 100, 1, numPartitions = 5)
.write
.format("delta")
.save(path)
test("Purge DVs") {
val targetDf = spark.range(0, 100, 1, numPartitions = 5).toDF
withTempDeltaTable(targetDf) { (_, log) =>
val path = log.dataPath.toString

sql(s"DELETE FROM delta.`$path` WHERE id IN (0, 99)")
assert(log.update().allFiles.filter(_.deletionVector != null).count() === 2)

Expand All @@ -84,18 +81,31 @@ class DeltaPurgeSuite extends QueryTest
}

test("Purge a non-DV table is a noop") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
val log = DeltaLog.forTable(spark, path)
spark
.range(0, 100, 1, numPartitions = 5)
.write
.format("delta")
.save(path)
val targetDf = spark.range(0, 100, 1, numPartitions = 5).toDF
withTempDeltaTable(targetDf, enableDVs = false) { (_, log) =>
val versionBefore = log.update().version
executePurge(path)
executePurge(log.dataPath.toString)
val versionAfter = log.update().version
assert(versionBefore === versionAfter)
}
}

test("Purge some partitions of a table with DV") {
val targetDf = spark.range(0, 100, 1, numPartitions = 1)
.withColumn("part", col("id") % 4)
.toDF
withTempDeltaTable(targetDf, partitionBy = Seq("part")) { (_, log) =>
val path = log.dataPath
// Delete one row from each partition
sql(s"DELETE FROM delta.`$path` WHERE id IN (48, 49, 50, 51)")
val (addFiles1, _) = getFileActionsInLastVersion(log)
assert(addFiles1.size === 4)
assert(addFiles1.forall(_.deletionVector !== null))
// PURGE two partitions
sql(s"REORG TABLE delta.`$path` WHERE part IN (0, 2) APPLY (PURGE)")
val (addFiles2, _) = getFileActionsInLastVersion(log)
assert(addFiles2.size === 2)
assert(addFiles2.forall(_.deletionVector === null))
}
}
}

0 comments on commit 2f159c0

Please sign in to comment.