Skip to content

Commit

Permalink
[SPARK-24948][SHS] Delegate check access permissions to the file system
Browse files Browse the repository at this point in the history
In `SparkHadoopUtil. checkAccessPermission`,  we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons.

modified UTs

Author: Marco Gaido <[email protected]>

Closes apache#21895 from mgaido91/SPARK-24948.
  • Loading branch information
mgaido91 committed Aug 7, 2018
1 parent 136588e commit fb68910
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 140 deletions.
23 changes: 0 additions & 23 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
Expand Down Expand Up @@ -378,28 +377,6 @@ class SparkHadoopUtil extends Logging {
buffer.toString
}

private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
val perm = status.getPermission
val ugi = UserGroupInformation.getCurrentUser

if (ugi.getShortUserName == status.getOwner) {
if (perm.getUserAction.implies(mode)) {
return true
}
} else if (ugi.getGroupNames.contains(status.getGroup)) {
if (perm.getGroupAction.implies(mode)) {
return true
}
} else if (perm.getOtherAction.implies(mode)) {
return true
}

logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}

def serialize(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ package org.apache.spark.deploy.history

import java.io.{File, FileNotFoundException, IOException}
import java.util.{Date, ServiceLoader, UUID}
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionException
import scala.util.Try
import scala.xml.Node

import com.fasterxml.jackson.annotation.JsonIgnore
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
Expand Down Expand Up @@ -111,7 +111,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = new Path(logDir).getFileSystem(hadoopConf)
// Visible for testing
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
Expand Down Expand Up @@ -155,6 +156,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

private val blacklist = new ConcurrentHashMap[String, Long]

// Visible for testing
private[history] def isBlacklisted(path: Path): Boolean = {
blacklist.containsKey(path.getName)
}

private def blacklist(path: Path): Unit = {
blacklist.put(path.getName, clock.getTimeMillis())
}

/**
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
*/
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
}

private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()

/**
Expand Down Expand Up @@ -412,7 +432,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
!isBlacklisted(entry.getPath)
}
.filter { entry =>
try {
Expand Down Expand Up @@ -446,32 +466,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}")
}

val tasks = updated.map { entry =>
val tasks = updated.flatMap { entry =>
try {
replayExecutor.submit(new Runnable {
val task: Future[Unit] = replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime)
})
}, Unit)
Some(task -> entry.getPath)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
null
None
}
}.filter(_ != null)
}

pendingReplayTasksCount.addAndGet(tasks.size)

// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
tasks.foreach { task =>
tasks.foreach { case (task, path) =>
try {
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log $path", e.getCause)
blacklist(path)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
Expand Down Expand Up @@ -694,6 +719,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.delete(classOf[LogInfo], log.logPath)
}
}
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}

/**
Expand Down Expand Up @@ -871,13 +898,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private def deleteLog(log: Path): Unit = {
try {
fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
if (isBlacklisted(log)) {
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
} else {
try {
fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
}
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import scala.language.postfixOps
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
import org.mockito.Mockito.{doReturn, mock, spy, verify}
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, argThat}
import org.mockito.Mockito.{doReturn, doThrow, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -774,6 +776,42 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(new File(testDir.toURI).listFiles().size === validLogCount)
}

test("SPARK-24948: blacklist files we don't have read permission on") {
val clock = new ManualClock(1533132471)
val provider = new FsHistoryProvider(createTestConf(), clock)
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
writeFile(accessGranted, true, None,
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val mockedFs = spy(provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
argThat(new ArgumentMatcher[Path]() {
override def matches(path: Any): Boolean = {
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
}
}))
val mockedProvider = spy(provider)
when(mockedProvider.fs).thenReturn(mockedFs)
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
// Doing 2 times in order to check the blacklist filter too
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
val accessDeniedPath = new Path(accessDenied.getPath)
assert(mockedProvider.isBlacklisted(accessDeniedPath))
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
mockedProvider.cleanLogs()
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down

0 comments on commit fb68910

Please sign in to comment.