Skip to content

Commit

Permalink
first
Browse files Browse the repository at this point in the history
  • Loading branch information
rajeshparangi committed Sep 13, 2024
1 parent 74d19a5 commit 5e3ec29
Showing 1 changed file with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
None
}
}
.map { f =>
// Below logic will make paths url-encoded
SerializableFileStatus(pathStringtoUrlEncodedString(f.path), f.length, f.isDir,
f.modificationTime)
}
}

/**
Expand Down Expand Up @@ -273,6 +278,11 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
),
fileListingParallelism = Option(parallelism)
)
.map { f =>
// Below logic will make paths url-encoded
SerializableFileStatus(pathStringtoUrlEncodedString(f.path), f.length, f.isDir,
f.modificationTime)
}
}
val allFilesAndDirs = allFilesAndDirsWithDuplicates.groupByKey(_.path)
.mapGroups { (k, v) =>
Expand All @@ -299,6 +309,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
// 5. We subtract all the valid files and tombstones in our state
// 6. We filter all paths with a count of 1, which will correspond to files not in the
// state, and empty directories. We can safely delete all of these
val canonicalizedBasePath = SparkPath.fromPathString(basePath).urlEncoded
val diff = allFilesAndDirs
.where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
.mapPartitions { fileStatusIterator =>
Expand All @@ -307,16 +318,18 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
fileStatusIterator.flatMap { fileStatus =>
if (fileStatus.isDir) {
Iterator.single(FileNameAndSize(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
relativize(urlEncodedStringToPath(fileStatus.path), fs,
reservoirBase, isDir = true), 0L))
} else {
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
val dirs = getAllSubdirs(canonicalizedBasePath, fileStatus.path, fs)
val dirsWithSlash = dirs.map { p =>
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
val relativizedPath = relativize(urlEncodedStringToPath(p), fs,
reservoirBase, isDir = true)
FileNameAndSize(relativizedPath, 0L)
}
dirsWithSlash ++ Iterator(
FileNameAndSize(relativize(
fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
urlEncodedStringToPath(fileStatus.path), fs, reservoirBase, isDir = false),
fileStatus.length))
}
}
Expand All @@ -337,9 +350,9 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
.select(col("path"))
.as[String]
.map { relativePath =>
assert(!stringToPath(relativePath).isAbsolute,
assert(!urlEncodedStringToPath(relativePath).isAbsolute,
"Shouldn't have any absolute paths for deletion here.")
pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
pathToUrlEncodedString(DeltaFileOperations.absolutePath(basePath, relativePath))
}
val timeTakenToIdentifyEligibleFiles =
System.currentTimeMillis() - startTimeToIdentifyEligibleFiles
Expand Down Expand Up @@ -369,7 +382,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories " +
log"that are safe to delete. Vacuum stats: ${MDC(DeltaLogKeys.STATS, stats)}")

return diffFiles.map(f => stringToPath(f).toString).toDF("path")
return diffFiles.map(f => urlEncodedStringToPath(f).toString).toDF("path")
}
logVacuumStart(
spark,
Expand Down Expand Up @@ -574,7 +587,7 @@ trait VacuumCommandImpl extends DeltaCommand {
fs: FileSystem,
reservoirBase: Path,
isDir: Boolean): String = {
pathToString(DeltaFileOperations.tryRelativizePath(fs, reservoirBase, path))
pathToUrlEncodedString(DeltaFileOperations.tryRelativizePath(fs, reservoirBase, path))
}

/**
Expand All @@ -601,21 +614,22 @@ trait VacuumCommandImpl extends DeltaCommand {
diff.repartition(parallelPartitions).mapPartitions { files =>
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val filesDeletedPerPartition =
files.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
files.map(p => urlEncodedStringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
Iterator(filesDeletedPerPartition)
}.collect().sum
} else {
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val fileResultSet = diff.toLocalIterator().asScala
fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
fileResultSet.map(p => urlEncodedStringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
}
}

// scalastyle:off pathfromuri
protected def stringToPath(path: String): Path = new Path(new URI(path))
// scalastyle:on pathfromuri
protected def urlEncodedStringToPath(path: String): Path = SparkPath.fromUrlString(path).toPath

protected def pathToUrlEncodedString(path: Path): String = SparkPath.fromPath(path).toString

protected def pathToString(path: Path): String = path.toUri.toString
protected def pathStringtoUrlEncodedString(path: String) =
SparkPath.fromPathString(path).toString

/** Returns the relative path of a file action or None if the file lives outside of the table. */
protected def getActionRelativePath(
Expand All @@ -631,18 +645,18 @@ trait VacuumCommandImpl extends DeltaCommand {
fs: FileSystem,
basePath: Path,
relativizeIgnoreError: Boolean): Option[String] = {
val filePath = stringToPath(path)
val filePath = urlEncodedStringToPath(path)
if (filePath.isAbsolute) {
val maybeRelative =
DeltaFileOperations.tryRelativizePath(fs, basePath, filePath, relativizeIgnoreError)
if (maybeRelative.isAbsolute) {
// This file lives outside the directory of the table.
None
} else {
Some(pathToString(maybeRelative))
Some(pathToUrlEncodedString(maybeRelative))
}
} else {
Some(pathToString(filePath))
Some(pathToUrlEncodedString(filePath))
}
}

Expand Down Expand Up @@ -686,7 +700,7 @@ trait VacuumCommandImpl extends DeltaCommand {
case Some(dv) if dv.isOnDisk =>
if (dv.isRelative) {
// We actually want a relative path here.
Some((pathToString(dv.absolutePath(new Path("."))), dv.sizeInBytes))
Some((pathToUrlEncodedString(dv.absolutePath(new Path("."))), dv.sizeInBytes))
} else {
assert(dv.isAbsolute)
// This is never going to be a path relative to `basePath` for DVs.
Expand Down

0 comments on commit 5e3ec29

Please sign in to comment.