Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24948][SHS][BACKPORT-2.2] Delegate check access permissions to the file system #22022

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
Expand Down Expand Up @@ -379,27 +378,6 @@ class SparkHadoopUtil extends Logging {
buffer.toString
}

private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
val perm = status.getPermission
val ugi = UserGroupInformation.getCurrentUser

if (ugi.getShortUserName == status.getOwner) {
if (perm.getUserAction.implies(mode)) {
return true
}
} else if (ugi.getGroupNames.contains(status.getGroup)) {
if (perm.getGroupAction.implies(mode)) {
return true
}
} else if (perm.getOtherAction.implies(mode)) {
return true
}

logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.deploy.history

import java.io.{FileNotFoundException, IOException, OutputStream}
import java.util.UUID
import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.xml.Node

import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
Expand Down Expand Up @@ -105,7 +105,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = new Path(logDir).getFileSystem(hadoopConf)
// Visible for testing
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
Expand All @@ -129,6 +130,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)

private val blacklist = new ConcurrentHashMap[String, Long]

// Visible for testing
private[history] def isBlacklisted(path: Path): Boolean = {
blacklist.containsKey(path.getName)
}

private def blacklist(path: Path): Unit = {
blacklist.put(path.getName, clock.getTimeMillis())
}

/**
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
*/
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
}

/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
Expand Down Expand Up @@ -326,7 +346,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// the end-user.
!entry.getPath().getName().startsWith(".") &&
prevFileSize < entry.getLen() &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
!isBlacklisted(entry.getPath)
}
.flatMap { entry => Some(entry) }
.sortWith { case (entry1, entry2) =>
Expand Down Expand Up @@ -481,6 +501,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

} catch {
case e: AccessControlException =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log ${fileStatus.getPath}", e.getCause)
blacklist(fileStatus.getPath)
None
case e: Exception =>
logError(
s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
Expand Down Expand Up @@ -587,6 +612,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, spy, verify}
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, argThat}
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -135,14 +137,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// setReadable(...) does not work on Windows. Please refer JDK-6728842.
assume(!Utils.isWindows)

class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
var mergeApplicationListingCall = 0
override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
super.mergeApplicationListing(fileStatus)
mergeApplicationListingCall += 1
}
}
val provider = new TestFsHistoryProvider
val provider = new FsHistoryProvider(createTestConf())

val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
Expand All @@ -159,8 +154,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
updateAndCheck(provider) { list =>
list.size should be (1)
}

provider.mergeApplicationListingCall should be (1)
}

test("history file is renamed from inprogress to completed") {
Expand Down Expand Up @@ -583,6 +576,42 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("SPARK-24948: blacklist files we don't have read permission on") {
val clock = new ManualClock(1533132471)
val provider = new FsHistoryProvider(createTestConf(), clock)
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
writeFile(accessGranted, true, None,
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val mockedFs = spy(provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
argThat(new ArgumentMatcher[Path]() {
override def matches(path: Any): Boolean = {
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
}
}))
val mockedProvider = spy(provider)
when(mockedProvider.fs).thenReturn(mockedFs)
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
writeFile(accessDenied, true, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
// Doing 2 times in order to check the blacklist filter too
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
val accessDeniedPath = new Path(accessDenied.getPath)
assert(mockedProvider.isBlacklisted(accessDeniedPath))
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
mockedProvider.cleanLogs()
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down