diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index c06c605f994..7058ccfb3d7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -149,6 +149,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { clock: Clock = new SystemClock): DataFrame = { recordDeltaOperation(deltaLog, "delta.gc") { + val vacuumStartTime = System.currentTimeMillis() val path = deltaLog.dataPath val deltaHadoopConf = deltaLog.newDeltaHadoopConf() val fs = path.getFileSystem(deltaHadoopConf) @@ -210,6 +211,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { org.apache.spark.sql.Encoders.product[FileNameAndSize] val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path + val filesAndDirsPresentBeforeDelete = allFilesAndDirs.count() // The logic below is as follows: // 1. We take all the files and directories listed in our reservoir @@ -264,6 +266,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val timeTakenToIdentifyEligibleFiles = System.currentTimeMillis() - startTimeToIdentifyEligibleFiles + val numFiles = diffFiles.count() if (dryRun) { val stats = DeltaVacuumStats( @@ -272,14 +275,19 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { defaultRetentionMillis = snapshotTombstoneRetentionMillis, minRetainedTimestamp = deleteBeforeTimestamp, dirsPresentBeforeDelete = dirCounts, + filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete, objectsDeleted = numFiles, sizeOfDataToDelete = sizeOfDataToDelete, timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles, - timeTakenForDelete = 0L) + timeTakenForDelete = 0L, + vacuumStartTime = vacuumStartTime, + vacuumEndTime = System.currentTimeMillis, + numPartitionColumns = partitionColumns.size + ) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) - logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " + - s"a total of $dirCounts directories that are safe to delete.") + logInfo(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " + + s"a total of $dirCounts directories that are safe to delete. Vacuum stats: $stats") return diffFiles.map(f => stringToPath(f).toString).toDF("path") } @@ -308,12 +316,18 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { defaultRetentionMillis = snapshotTombstoneRetentionMillis, minRetainedTimestamp = deleteBeforeTimestamp, dirsPresentBeforeDelete = dirCounts, + filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete, objectsDeleted = filesDeleted, sizeOfDataToDelete = sizeOfDataToDelete, timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles, - timeTakenForDelete = timeTakenForDelete) + timeTakenForDelete = timeTakenForDelete, + vacuumStartTime = vacuumStartTime, + vacuumEndTime = System.currentTimeMillis, + numPartitionColumns = partitionColumns.size) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts)) + logInfo(s"Deleted $filesDeleted files ($sizeOfDataToDelete bytes) and directories in " + + s"a total of $dirCounts directories. Vacuum stats: $stats") spark.createDataset(Seq(basePath)).toDF("path") @@ -576,7 +590,12 @@ case class DeltaVacuumStats( defaultRetentionMillis: Long, minRetainedTimestamp: Long, dirsPresentBeforeDelete: Long, + filesAndDirsPresentBeforeDelete: Long, objectsDeleted: Long, sizeOfDataToDelete: Long, timeTakenToIdentifyEligibleFiles: Long, - timeTakenForDelete: Long) + timeTakenForDelete: Long, + vacuumStartTime: Long, + vacuumEndTime: Long, + numPartitionColumns: Long +)