Skip to content

Commit

Permalink
[SPARK-24948][SHS][BACKPORT-2.2] Delegate check access permissions to…
Browse files Browse the repository at this point in the history
… the file system

## What changes were proposed in this pull request?

In `SparkHadoopUtil. checkAccessPermission`,  we consider only basic permissions in order to check whether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons.

## How was this patch tested?

added UT

Author: Marco Gaido <[email protected]>

Closes apache#22022 from mgaido91/SPARK-24948_2.2.
  • Loading branch information
mgaido91 authored and William Montaz committed Sep 26, 2019
1 parent fe6996b commit 0d101ce
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 136 deletions.
22 changes: 0 additions & 22 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
Expand Down Expand Up @@ -379,27 +378,6 @@ class SparkHadoopUtil extends Logging {
buffer.toString
}

private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
val perm = status.getPermission
val ugi = UserGroupInformation.getCurrentUser

if (ugi.getShortUserName == status.getOwner) {
if (perm.getUserAction.implies(mode)) {
return true
}
} else if (ugi.getGroupNames.contains(status.getGroup)) {
if (perm.getGroupAction.implies(mode)) {
return true
}
} else if (perm.getOtherAction.implies(mode)) {
return true
}

logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import java.util.UUID
import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.xml.Node

import collection.JavaConverters._
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
Expand Down Expand Up @@ -106,7 +106,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = new Path(logDir).getFileSystem(hadoopConf)
// Visible for testing
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
Expand All @@ -131,6 +132,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)

private val blacklist = new ConcurrentHashMap[String, Long]

// Visible for testing
private[history] def isBlacklisted(path: Path): Boolean = {
blacklist.containsKey(path.getName)
}

private def blacklist(path: Path): Unit = {
blacklist.put(path.getName, clock.getTimeMillis())
}

/**
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
*/
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
}

/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
Expand Down Expand Up @@ -328,7 +348,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// the end-user.
!entry.getPath().getName().startsWith(".") &&
prevFileSize < entry.getLen() &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
!isBlacklisted(entry.getPath)
}
.flatMap { entry => Some(entry) }
.sortWith { case (entry1, entry2) =>
Expand Down Expand Up @@ -483,6 +503,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

} catch {
case e: AccessControlException =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log ${fileStatus.getPath}", e.getCause)
blacklist(fileStatus.getPath)
None
case e: Exception =>
logError(
s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
Expand Down Expand Up @@ -590,6 +615,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, spy, verify}
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, argThat}
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -135,14 +137,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// setReadable(...) does not work on Windows. Please refer JDK-6728842.
assume(!Utils.isWindows)

class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
var mergeApplicationListingCall = 0
override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
super.mergeApplicationListing(fileStatus)
mergeApplicationListingCall += 1
}
}
val provider = new TestFsHistoryProvider
val provider = new FsHistoryProvider(createTestConf())

val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
Expand All @@ -159,8 +154,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
updateAndCheck(provider) { list =>
list.size should be (1)
}

provider.mergeApplicationListingCall should be (1)
}

test("history file is renamed from inprogress to completed") {
Expand Down Expand Up @@ -583,6 +576,42 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("SPARK-24948: blacklist files we don't have read permission on") {
val clock = new ManualClock(1533132471)
val provider = new FsHistoryProvider(createTestConf(), clock)
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
writeFile(accessGranted, true, None,
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val mockedFs = spy(provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
argThat(new ArgumentMatcher[Path]() {
override def matches(path: Any): Boolean = {
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
}
}))
val mockedProvider = spy(provider)
when(mockedProvider.fs).thenReturn(mockedFs)
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
// Doing 2 times in order to check the blacklist filter too
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
val accessDeniedPath = new Path(accessDenied.getPath)
assert(mockedProvider.isBlacklisted(accessDeniedPath))
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
mockedProvider.cleanLogs()
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down

0 comments on commit 0d101ce

Please sign in to comment.