From aec9b8682ac33216743ad3d8d522398ef9922444 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 1 Aug 2018 16:37:37 +0200 Subject: [PATCH] check access when reading and use blacklist --- .../deploy/history/FsHistoryProvider.scala | 107 +++++++++--------- .../history/FsHistoryProviderSuite.scala | 67 ++++++----- 2 files changed, 92 insertions(+), 82 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3ebd758818fb3..911de71cf2b54 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -21,24 +21,23 @@ import java.io.{File, FileNotFoundException, IOException} import java.nio.file.Files import java.nio.file.attribute.PosixFilePermissions import java.util.{Date, ServiceLoader} -import java.util.concurrent.{ExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.concurrent.ExecutionException import scala.io.Source -import scala.util.{Failure, Success, Try} +import scala.util.Try import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore -import com.google.common.cache.CacheBuilder import com.google.common.io.ByteStreams import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.hdfs.protocol.HdfsConstants -import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} +import org.apache.hadoop.security.AccessControlException import org.fusesource.leveldbjni.internal.NativeDB import org.apache.spark.{SecurityManager, SparkConf, SparkException} @@ -81,8 +80,8 @@ import org.apache.spark.util.kvstore._ * break. Simple streaming of JSON-formatted events, as is implemented today, implicitly * maintains this invariant. */ -private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) - extends ApplicationHistoryProvider with CachedFileSystemHelper with Logging { +private[history] class FsHistoryProvider(conf: SparkConf, protected val clock: Clock) + extends ApplicationHistoryProvider with LogFilesBlacklisting with Logging { def this(conf: SparkConf) = { this(conf, new SystemClock()) @@ -114,10 +113,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) "; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString + "; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString) - protected val expireTimeInSeconds = conf.get(MAX_LOG_AGE_S) - private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - protected val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) + // Visible for testing + private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) // Used by check event thread and clean log thread. // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs @@ -421,7 +419,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - checkAccessPermission(entry.getPath, FsAction.READ) + !isBlacklisted(entry.getPath) } .filter { entry => try { @@ -464,32 +462,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}") } - val tasks = updated.map { entry => + val tasks = updated.flatMap { entry => try { - replayExecutor.submit(new Runnable { + val task: Future[Unit] = replayExecutor.submit(new Runnable { override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true) - }) + }, Unit) + Some(task -> entry.getPath) } catch { // let the iteration over the updated entries break, since an exception on // replayExecutor.submit (..) indicates the ExecutorService is unable // to take any more submissions at this time case e: Exception => logError(s"Exception while submitting event log for replay", e) - null + None } - }.filter(_ != null) + } pendingReplayTasksCount.addAndGet(tasks.size) // Wait for all tasks to finish. This makes sure that checkForLogs // is not scheduled again while some tasks are already running in // the replayExecutor. - tasks.foreach { task => + tasks.foreach { case (task, path) => try { task.get() } catch { case e: InterruptedException => throw e + case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] => + // We don't have read permissions on the log file + logDebug(s"Unable to read log $path", e.getCause) + blacklist(path) case e: Exception => logError("Exception while merging application listings", e) } finally { @@ -736,7 +739,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * Delete event logs from the log directory according to the clean policy defined by the user. */ private[history] def cleanLogs(): Unit = Utils.tryLog { - val maxTime = clock.getTimeMillis() - expireTimeInSeconds * 1000 + val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 val expired = listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") @@ -782,8 +785,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.delete(classOf[LogInfo], log.logPath) } } - // Ensure the cache gets rid of the expired entries. - cache.cleanUp() + // Clean the blacklist from the expired entries. + clearBlacklist(CLEAN_INTERVAL_S) } /** @@ -943,13 +946,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } private def deleteLog(log: Path): Unit = { - try { - fs.delete(log, true) - } catch { - case _: AccessControlException => - logInfo(s"No permission to delete $log, ignoring.") - case ioe: IOException => - logError(s"IOException in cleaning $log", ioe) + if (isBlacklisted(log)) { + logDebug(s"Skipping deleting $log as we don't have permissions on it.") + } else { + try { + fs.delete(log, true) + } catch { + case _: AccessControlException => + logInfo(s"No permission to delete $log, ignoring.") + case ioe: IOException => + logError(s"IOException in cleaning $log", ioe) + } } } @@ -978,39 +985,35 @@ private[history] object FsHistoryProvider { private[history] val CURRENT_LISTING_VERSION = 1L } -private[history] trait CachedFileSystemHelper extends Logging { - protected def fs: FileSystem - protected def expireTimeInSeconds: Long +/** + * Manages a blacklist containing the files which cannot be read due to lack of access permissions. + */ +private[history] trait LogFilesBlacklisting extends Logging { + protected def clock: Clock /** - * LRU cache containing the result for the already checked files. + * Contains the name of blacklisted files and their insertion time. */ - // Visible for testing. - private[history] val cache = CacheBuilder.newBuilder() - .expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS) - .build[String, java.lang.Boolean]() + private val blacklist = new ConcurrentHashMap[String, Long] - private val userName = UserGroupInformation.getCurrentUser.getShortUserName + private[history] def isBlacklisted(path: Path): Boolean = { + blacklist.containsKey(path.getName) + } - private[history] def checkAccessPermission(path: Path, mode: FsAction): Boolean = { - Option(cache.getIfPresent(path.getName)).map(_.booleanValue()) - .getOrElse(doCheckAccessPermission(path, mode)) + private[history] def blacklist(path: Path): Unit = { + blacklist.put(path.getName, clock.getTimeMillis()) } - private def doCheckAccessPermission(path: Path, mode: FsAction): Boolean = { - Try(fs.access(path, mode)) match { - case Success(_) => - cache.put(path.getName, true) - true - case Failure(e: AccessControlException) => - logInfo(s"Permission denied for user '$userName' to access $path", e) - cache.put(path.getName, false) - false - case Failure(_) => - // When we are unable to check whether we can access the file we don't cache the result - // so we can retry later - false + /** + * Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`. + */ + protected def clearBlacklist(expireTimeInSeconds: Long): Unit = { + val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000 + val expired = new mutable.ArrayBuffer[String] + blacklist.asScala.foreach { + case (path, creationTime) if creationTime < expiredThreshold => expired += path } + expired.foreach(blacklist.remove(_)) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 628cecb8bc402..b4eba755eccbf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -27,15 +27,13 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.security.AccessControlException import org.json4s.jackson.JsonMethods._ -import org.mockito.Matchers.any -import org.mockito.Mockito.{mock, spy, verify, when} -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer +import org.mockito.ArgumentMatcher +import org.mockito.Matchers.{any, argThat} +import org.mockito.Mockito.{doThrow, mock, spy, verify, when} import org.scalatest.BeforeAndAfter import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -822,31 +820,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("SPARK-24948: delegate permission check to the file system class") { - val helper = new CachedFileSystemHelper { - override protected val fs: FileSystem = mock(classOf[FileSystem]) - when(fs.access(any[Path], any[FsAction])).thenAnswer(new Answer[Unit] { - override def answer(invocation: InvocationOnMock): Unit = { - invocation.getArgumentAt(0, classOf[Path]).getName match { - case "accessGranted" => - case "accessDenied" => throw new AccessControlException("File not found.") - case _ => throw new FileNotFoundException("File not found.") - } + test("SPARK-24948: blacklist files we don't have read permission on") { + val clock = new ManualClock(1533132471) + val provider = new FsHistoryProvider(createTestConf(), clock) + val accessDenied = newLogFile("accessDenied", None, inProgress = false) + writeFile(accessDenied, true, None, + SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None)) + val accessGranted = newLogFile("accessGranted", None, inProgress = false) + writeFile(accessGranted, true, None, + SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + val mockedFs = spy(provider.fs) + doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open( + argThat(new ArgumentMatcher[Path]() { + override def matches(path: Any): Boolean = { + path.asInstanceOf[Path].getName.toLowerCase == "accessdenied" } - }) - - override protected def expireTimeInSeconds = 5L - } - assert(helper.checkAccessPermission(new Path("accessGranted"), FsAction.READ)) - assert(helper.cache.getIfPresent("accessGranted")) - assert(!helper.checkAccessPermission(new Path("accessDenied"), FsAction.READ)) - assert(!helper.cache.getIfPresent("accessDenied")) - assert(!helper.checkAccessPermission(new Path("nonExisting"), FsAction.READ)) - assert(helper.cache.getIfPresent("nonExisting") == null) - Thread.sleep(5000) // wait for the cache entries to expire - helper.cache.cleanUp() - assert(helper.cache.getIfPresent("accessGranted") == null) - assert(helper.cache.getIfPresent("accessGranted") == null) + })) + val mockedProvider = spy(provider) + when(mockedProvider.fs).thenReturn(mockedFs) + updateAndCheck(mockedProvider) { list => + list.size should be(1) + } + writeFile(accessDenied, true, None, + SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None), + SparkListenerApplicationEnd(5L)) + // Doing 2 times in order to check the blacklist filter too + updateAndCheck(mockedProvider) { list => + list.size should be(1) + } + val accessDeniedPath = new Path(accessDenied.getPath) + assert(mockedProvider.isBlacklisted(accessDeniedPath)) + clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d + mockedProvider.cleanLogs() + assert(!mockedProvider.isBlacklisted(accessDeniedPath)) } /**