Skip to content

Commit

Permalink
[SPARK-17235][SQL] Support purging of old logs in MetadataLog
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This patch adds a purge interface to MetadataLog, and an implementation in HDFSMetadataLog. The purge function is currently unused, but I will use it to purge old execution and file source logs in follow-up patches. These changes are required in a production structured streaming job that runs for a long period of time.

## How was this patch tested?
Added a unit test case in HDFSMetadataLogSuite.

Author: petermaxlee <[email protected]>

Closes #14802 from petermaxlee/SPARK-17235.
  • Loading branch information
petermaxlee authored and rxin committed Aug 26, 2016
1 parent a11d10f commit f64a1dd
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
None
}

/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
*/
override def purge(thresholdBatchId: Long): Unit = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))

for (batchId <- batchIds if batchId < thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
logTrace(s"Removed metadata log file: $path")
}
}

private def createFileManager(): FileManager = {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ trait MetadataLog[T] {
* Return the latest batch Id and its metadata if exist.
*/
def getLatest(): Option[(Long, T)]

/**
* Removes all the log entry earlier than thresholdBatchId (exclusive).
* This operation should be idempotent.
*/
def purge(thresholdBatchId: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
test("FileManager: FileContextManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
testManager(path, new FileContextManager(path, new Configuration))
testFileManager(path, new FileContextManager(path, new Configuration))
}
}

test("FileManager: FileSystemManager") {
withTempDir { temp =>
val path = new Path(temp.getAbsolutePath)
testManager(path, new FileSystemManager(path, new Configuration))
testFileManager(path, new FileSystemManager(path, new Configuration))
}
}

Expand Down Expand Up @@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

testWithUninterruptibleThread("HDFSMetadataLog: purge") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.add(1, "batch1"))
assert(metadataLog.add(2, "batch2"))
assert(metadataLog.get(0).isDefined)
assert(metadataLog.get(1).isDefined)
assert(metadataLog.get(2).isDefined)
assert(metadataLog.getLatest().get._1 == 2)

metadataLog.purge(2)
assert(metadataLog.get(0).isEmpty)
assert(metadataLog.get(1).isEmpty)
assert(metadataLog.get(2).isDefined)
assert(metadataLog.getLatest().get._1 == 2)
}
}

testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
Expand Down Expand Up @@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}


def testManager(basePath: Path, fm: FileManager): Unit = {
/** Basic test case for [[FileManager]] implementation. */
private def testFileManager(basePath: Path, fm: FileManager): Unit = {
// Mkdirs
val dir = new Path(s"$basePath/dir/subdir/subsubdir")
assert(!fm.exists(dir))
Expand Down

0 comments on commit f64a1dd

Please sign in to comment.