Skip to content

Commit

Permalink
check access when reading and use blacklist
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed Aug 1, 2018
1 parent 2ad5285 commit aec9b86
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@ import java.io.{File, FileNotFoundException, IOException}
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermissions
import java.util.{Date, ServiceLoader}
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionException
import scala.io.Source
import scala.util.{Failure, Success, Try}
import scala.util.Try
import scala.xml.Node

import com.fasterxml.jackson.annotation.JsonIgnore
import com.google.common.cache.CacheBuilder
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.{AccessControlException, UserGroupInformation}
import org.apache.hadoop.security.AccessControlException
import org.fusesource.leveldbjni.internal.NativeDB

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
Expand Down Expand Up @@ -81,8 +80,8 @@ import org.apache.spark.util.kvstore._
* break. Simple streaming of JSON-formatted events, as is implemented today, implicitly
* maintains this invariant.
*/
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
extends ApplicationHistoryProvider with CachedFileSystemHelper with Logging {
private[history] class FsHistoryProvider(conf: SparkConf, protected val clock: Clock)
extends ApplicationHistoryProvider with LogFilesBlacklisting with Logging {

def this(conf: SparkConf) = {
this(conf, new SystemClock())
Expand Down Expand Up @@ -114,10 +113,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)

protected val expireTimeInSeconds = conf.get(MAX_LOG_AGE_S)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
protected val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)
// Visible for testing
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
Expand Down Expand Up @@ -421,7 +419,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
checkAccessPermission(entry.getPath, FsAction.READ)
!isBlacklisted(entry.getPath)
}
.filter { entry =>
try {
Expand Down Expand Up @@ -464,32 +462,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}")
}

val tasks = updated.map { entry =>
val tasks = updated.flatMap { entry =>
try {
replayExecutor.submit(new Runnable {
val task: Future[Unit] = replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
})
}, Unit)
Some(task -> entry.getPath)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
null
None
}
}.filter(_ != null)
}

pendingReplayTasksCount.addAndGet(tasks.size)

// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
tasks.foreach { task =>
tasks.foreach { case (task, path) =>
try {
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
// We don't have read permissions on the log file
logDebug(s"Unable to read log $path", e.getCause)
blacklist(path)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
Expand Down Expand Up @@ -736,7 +739,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
private[history] def cleanLogs(): Unit = Utils.tryLog {
val maxTime = clock.getTimeMillis() - expireTimeInSeconds * 1000
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000

val expired = listing.view(classOf[ApplicationInfoWrapper])
.index("oldestAttempt")
Expand Down Expand Up @@ -782,8 +785,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.delete(classOf[LogInfo], log.logPath)
}
}
// Ensure the cache gets rid of the expired entries.
cache.cleanUp()
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}

/**
Expand Down Expand Up @@ -943,13 +946,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private def deleteLog(log: Path): Unit = {
try {
fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
if (isBlacklisted(log)) {
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
} else {
try {
fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
}
}
}

Expand Down Expand Up @@ -978,39 +985,35 @@ private[history] object FsHistoryProvider {
private[history] val CURRENT_LISTING_VERSION = 1L
}

private[history] trait CachedFileSystemHelper extends Logging {
protected def fs: FileSystem
protected def expireTimeInSeconds: Long
/**
* Manages a blacklist containing the files which cannot be read due to lack of access permissions.
*/
private[history] trait LogFilesBlacklisting extends Logging {
protected def clock: Clock

/**
* LRU cache containing the result for the already checked files.
* Contains the name of blacklisted files and their insertion time.
*/
// Visible for testing.
private[history] val cache = CacheBuilder.newBuilder()
.expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS)
.build[String, java.lang.Boolean]()
private val blacklist = new ConcurrentHashMap[String, Long]

private val userName = UserGroupInformation.getCurrentUser.getShortUserName
private[history] def isBlacklisted(path: Path): Boolean = {
blacklist.containsKey(path.getName)
}

private[history] def checkAccessPermission(path: Path, mode: FsAction): Boolean = {
Option(cache.getIfPresent(path.getName)).map(_.booleanValue())
.getOrElse(doCheckAccessPermission(path, mode))
private[history] def blacklist(path: Path): Unit = {
blacklist.put(path.getName, clock.getTimeMillis())
}

private def doCheckAccessPermission(path: Path, mode: FsAction): Boolean = {
Try(fs.access(path, mode)) match {
case Success(_) =>
cache.put(path.getName, true)
true
case Failure(e: AccessControlException) =>
logInfo(s"Permission denied for user '$userName' to access $path", e)
cache.put(path.getName, false)
false
case Failure(_) =>
// When we are unable to check whether we can access the file we don't cache the result
// so we can retry later
false
/**
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
*/
protected def clearBlacklist(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
val expired = new mutable.ArrayBuffer[String]
blacklist.asScala.foreach {
case (path, creationTime) if creationTime < expiredThreshold => expired += path
}
expired.foreach(blacklist.remove(_))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, spy, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, argThat}
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -822,31 +820,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("SPARK-24948: delegate permission check to the file system class") {
val helper = new CachedFileSystemHelper {
override protected val fs: FileSystem = mock(classOf[FileSystem])
when(fs.access(any[Path], any[FsAction])).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
invocation.getArgumentAt(0, classOf[Path]).getName match {
case "accessGranted" =>
case "accessDenied" => throw new AccessControlException("File not found.")
case _ => throw new FileNotFoundException("File not found.")
}
test("SPARK-24948: blacklist files we don't have read permission on") {
val clock = new ManualClock(1533132471)
val provider = new FsHistoryProvider(createTestConf(), clock)
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
writeFile(accessGranted, true, None,
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val mockedFs = spy(provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
argThat(new ArgumentMatcher[Path]() {
override def matches(path: Any): Boolean = {
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
}
})

override protected def expireTimeInSeconds = 5L
}
assert(helper.checkAccessPermission(new Path("accessGranted"), FsAction.READ))
assert(helper.cache.getIfPresent("accessGranted"))
assert(!helper.checkAccessPermission(new Path("accessDenied"), FsAction.READ))
assert(!helper.cache.getIfPresent("accessDenied"))
assert(!helper.checkAccessPermission(new Path("nonExisting"), FsAction.READ))
assert(helper.cache.getIfPresent("nonExisting") == null)
Thread.sleep(5000) // wait for the cache entries to expire
helper.cache.cleanUp()
assert(helper.cache.getIfPresent("accessGranted") == null)
assert(helper.cache.getIfPresent("accessGranted") == null)
}))
val mockedProvider = spy(provider)
when(mockedProvider.fs).thenReturn(mockedFs)
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
// Doing 2 times in order to check the blacklist filter too
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
val accessDeniedPath = new Path(accessDenied.getPath)
assert(mockedProvider.isBlacklisted(accessDeniedPath))
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
mockedProvider.cleanLogs()
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
}

/**
Expand Down

0 comments on commit aec9b86

Please sign in to comment.