Skip to content

Commit

Permalink
first
Browse files Browse the repository at this point in the history
  • Loading branch information
rajeshparangi committed Aug 16, 2024
1 parent 597950c commit cda0387
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.delta.commands

// scalastyle:off import.ordering.noEmptyLine
import java.io.File
import java.net.URI
import java.util.Date
import java.util.concurrent.TimeUnit
Expand All @@ -25,7 +26,7 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames}
import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
Expand All @@ -38,7 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.functions.{col, count, lit, replace, startswith, substr, sum}
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock, Utils}

/**
* Vacuums the table by clearing all untracked files and folders within this table.
Expand Down Expand Up @@ -485,11 +486,12 @@ trait VacuumCommandImpl extends DeltaCommand {
metrics("numFilesToDelete").set(diff.count())
metrics("sizeOfDataToDelete").set(sizeOfDataToDelete)
txn.registerSQLMetrics(spark, metrics)
txn.commit(actions = Seq(), DeltaOperations.VacuumStart(
val version = txn.commit(actions = Seq(), DeltaOperations.VacuumStart(
checkEnabled,
specifiedRetentionMillis,
defaultRetentionMillis
))
setCommitClock(deltaLog, version)
}
}

Expand Down Expand Up @@ -529,16 +531,25 @@ trait VacuumCommandImpl extends DeltaCommand {
metrics("numVacuumedDirectories").set(dirCounts.get)
txn.registerSQLMetrics(spark, metrics)
}
txn.commit(actions = Seq(), DeltaOperations.VacuumEnd(
val version = txn.commit(actions = Seq(), DeltaOperations.VacuumEnd(
status
))
setCommitClock(deltaLog, version)
}

if (filesDeleted.nonEmpty) {
logConsole(s"Deleted ${filesDeleted.get} files and directories in a total " +
s"of ${dirCounts.get} directories.")
}
}
protected def setCommitClock(deltaLog: DeltaLog, version: Long) = {
// This is done to make sure that the commit timestamp reflects the one provided by the clock
// object.
if (Utils.isTesting) {
val f = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
f.setLastModified(deltaLog.clock.getTimeMillis())
}
}

/**
* Attempts to relativize the `path` with respect to the `reservoirBase` and converts the path to
Expand Down Expand Up @@ -598,7 +609,15 @@ trait VacuumCommandImpl extends DeltaCommand {
fs: FileSystem,
basePath: Path,
relativizeIgnoreError: Boolean): Option[String] = {
val filePath = stringToPath(action.path)
getRelativePath(action.path, fs, basePath, relativizeIgnoreError)
}
/** Returns the relative path of a file or None if the file lives outside of the table. */
protected def getRelativePath(
path: String,
fs: FileSystem,
basePath: Path,
relativizeIgnoreError: Boolean): Option[String] = {
val filePath = stringToPath(path)
if (filePath.isAbsolute) {
val maybeRelative =
DeltaFileOperations.tryRelativizePath(fs, basePath, filePath, relativizeIgnoreError)
Expand Down Expand Up @@ -631,16 +650,16 @@ trait VacuumCommandImpl extends DeltaCommand {
}.getOrElse(Seq.empty)

val deletionVectorPath =
getDeletionVectorRelativePath(action).map(pathToString)
getDeletionVectorRelativePathAndSize(action).map(_._1)

paths ++ deletionVectorPath.toSeq
}

/**
* Returns the path of the on-disk deletion vector if it is stored relative to the
* `basePath` otherwise `None`.
* `basePath` and it's size otherwise `None`.
*/
protected def getDeletionVectorRelativePath(action: FileAction): Option[Path] = {
protected def getDeletionVectorRelativePathAndSize(action: FileAction): Option[(String, Long)] = {
val dv = action match {
case a: AddFile if a.deletionVector != null =>
Some(a.deletionVector)
Expand All @@ -653,7 +672,7 @@ trait VacuumCommandImpl extends DeltaCommand {
case Some(dv) if dv.isOnDisk =>
if (dv.isRelative) {
// We actually want a relative path here.
Some(dv.absolutePath(new Path(".")))
Some((pathToString(dv.absolutePath(new Path("."))), dv.sizeInBytes))
} else {
assert(dv.isAbsolute)
// This is never going to be a path relative to `basePath` for DVs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -161,13 +161,15 @@ trait DeltaVacuumSuiteBase extends QueryTest
expectedError: Class[T],
msg: Seq[String]) extends Operation

private final val RANDOM_FILE_CONTENT = "gibberish"

protected def createFile(
reservoirBase: String,
filePath: String,
file: File,
clock: ManualClock,
partitionValues: Map[String, String] = Map.empty): AddFile = {
FileUtils.write(file, "gibberish")
FileUtils.write(file, RANDOM_FILE_CONTENT)
file.setLastModified(clock.getTimeMillis())
createTestAddFile(
encodedPath = filePath,
Expand All @@ -188,11 +190,13 @@ trait DeltaVacuumSuiteBase extends QueryTest
if (commit) {
if (!DeltaTableUtils.isDeltaTable(spark, new Path(basePath))) {
// initialize the table
deltaLog.startTransaction().commitManually()
val version = deltaLog.startTransaction().commitManually()
setCommitClock(deltaLog, version, clock)
}
val txn = deltaLog.startTransaction()
val action = createFile(basePath, sanitizedPath, file, clock, partitionValues)
txn.commit(Seq(action), Write(SaveMode.Append))
val version = txn.commit(Seq(action), Write(SaveMode.Append))
setCommitClock(deltaLog, version, clock)
} else {
createFile(basePath, path, file, clock)
}
Expand All @@ -213,8 +217,10 @@ trait DeltaVacuumSuiteBase extends QueryTest
)
txn.registerSQLMetrics(spark, metrics)
val encodedPath = new Path(path).toUri.toString
txn.commit(Seq(RemoveFile(encodedPath, Option(clock.getTimeMillis()))),
val size = Some(RANDOM_FILE_CONTENT.length.toLong)
val version = txn.commit(Seq(RemoveFile(encodedPath, Option(clock.getTimeMillis()), size = size)),
Delete(Seq(Literal.TrueLiteral)))
setCommitClock(deltaLog, version, clock)
// scalastyle:on
case e: ExecuteVacuumInSQL =>
Given(s"*** Executing SQL: ${e.sql}")
Expand Down Expand Up @@ -340,6 +346,11 @@ trait DeltaVacuumSuiteBase extends QueryTest
changes.flatMap(_._2).collect { case a: AddCDCFile => a }.toList
}

protected def setCommitClock(deltaLog: DeltaLog, version: Long, clock: ManualClock) = {
val f = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
f.setLastModified(clock.getTimeMillis())
}

protected def testCDCVacuumForUpdateMerge(): Unit = {
withSQLConf(
DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true",
Expand Down Expand Up @@ -578,7 +589,9 @@ class DeltaVacuumSuite
val schema = new StructType().add("_underscore_col_", IntegerType).add("n", IntegerType)
val metadata =
Metadata(schemaString = schema.json, partitionColumns = Seq("_underscore_col_"))
txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true))
val version =
txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true))
setCommitClock(deltaLog, version, clock)
gcTest(deltaLog, clock)(
CreateFile("file1.txt", commitToActionLog = true, Map("_underscore_col_" -> "10")),
CreateFile("_underscore_col_=10/test.txt", true, Map("_underscore_col_" -> "10")),
Expand All @@ -599,7 +612,9 @@ class DeltaVacuumSuite
val schema = new StructType().add("_underscore_col_", IntegerType).add("n", IntegerType)
val metadata =
Metadata(schemaString = schema.json, partitionColumns = Seq("_underscore_col_"))
txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true))
val version =
txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true))
setCommitClock(deltaLog, version, clock)
val inventorySchema = StructType(
Seq(
StructField("file", StringType),
Expand Down Expand Up @@ -631,7 +646,9 @@ class DeltaVacuumSuite
// Vacuum should consider partition folders even for clean up even though it starts with `_`
val metadata =
Metadata(schemaString = schema.json, partitionColumns = Seq("_underscore_col_"))
txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true))
val version =
txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true))
setCommitClock(deltaLog, version, clock)
// Create a Seq of Rows containing the data
val data = Seq(
Row(s"${deltaLog.dataPath}", 300000L, true, 0L),
Expand Down Expand Up @@ -1258,6 +1275,7 @@ class DeltaVacuumSuite
withEnvironment { (dir, clock) =>
spark.range(2).write.format("delta").save(dir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, dir, clock)
setCommitClock(deltaLog, 0L, clock)
val expectedReturn = if (isDryRun) {
// dry run returns files that will be deleted
Seq(new Path(dir.getAbsolutePath, "file1.txt").toString)
Expand Down

0 comments on commit cda0387

Please sign in to comment.