Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24948][SHS] Delegate check access permissions to the file system #21895

Closed
wants to merge 9 commits into from

Conversation

mgaido91
Copy link
Contributor

@mgaido91 mgaido91 commented Jul 27, 2018

What changes were proposed in this pull request?

In SparkHadoopUtil. checkAccessPermission, we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons.

How was this patch tested?

modified UTs

@mgaido91
Copy link
Contributor Author

@jerryshao @vanzin may you please take a look at this? Thanks.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93673 has finished for PR 21895 at commit 1052c17.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93676 has finished for PR 21895 at commit ef42a93.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 28, 2018

Test build #93719 has finished for PR 21895 at commit ef42a93.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

cc @mridulm too

* Cache containing the result for the already checked files.
*/
// Visible for testing.
private[history] val cache = new mutable.HashMap[String, Boolean]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For long running history server in busy clusters (particularly where spark.history.fs.cleaner.maxAge is configured to be low), this Map will cause OOM.
Either an LRU cache or a disk backed map with periodic cleanup (based on maxAge) might be better ?

@mridulm
Copy link
Contributor

mridulm commented Jul 28, 2018

+CC @jerryshao

@SparkQA
Copy link

SparkQA commented Jul 28, 2018

Test build #93726 has finished for PR 21895 at commit 480e326.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 29, 2018

Test build #93735 has finished for PR 21895 at commit 2ad5285.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -973,6 +978,42 @@ private[history] object FsHistoryProvider {
private[history] val CURRENT_LISTING_VERSION = 1L
}

private[history] trait CachedFileSystemHelper extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, my main concern is about cache inconsistency if user changed the file permission during cache valid time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, but the only way to avoid this issue is to call fs.access every time,which may cause huge performance issues. Moreover,I think it is also very unlikely that a user manually changes permission of the event logs of an application and restarting the SHS in such a scenario would solve the problem. In the current state, even though the file is accessible, it is ignored and the user has no workaround other than changing ownership or permissions to all files,despite the user running SHS can read the files (moreover it is a regression for these users)...

Anyway if you have a better suggestion I am more than happy to follow it.

// Visible for testing.
private[history] val cache = CacheBuilder.newBuilder()
.expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS)
.build[String, java.lang.Boolean]()
Copy link
Contributor

@jerryshao jerryshao Aug 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the real world, there will be many event logs under the folder, this will lead to memory increase indefinitely and potentially lead to OOM. We have seen that customer has more than 100K event logs in this folder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory doesn't increase indefinitely as entries expire over the time. Moreover, as here we are storing a string containing only the name of the file and a Boolean, each entry is going to need about 100bytes in memory. With 100k event logs,this means about 10MB, which doesn't seem to me a value which can cause an OOM. Anyway, we can also add a maximum number of entries for this cache if you think it is necessary. This would cause some more RPC calls though.

@jerryshao
Copy link
Contributor

jerryshao commented Aug 1, 2018

My current thinking is to revert SPARK-20172 and improve the logging when exception is met during the actual read. Also if the file cannot be read for the first time, adding them to blacklist to avoid read again.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 1, 2018

@jerryshao thansk for your reply and suggestion. That can be done but I see mainly 2 problems IIUC:

  • your suggestion about blacklisting has the same "caching" and "memory leakage" problems of the solution proposed here, ie. if permissions on the file are changed, we wouldn't be aware until STS is restarted and we need to store in memory the set of the files in blacklist (they may be much less than the total number of files, this is true, so probably this is not a big problem);
  • with your suggestion, we will also try to delete the log file IIUC, so we have to lower to debug also the log related to access denied when deleting the files (I don't think this is a big issue, but this is something which has to be taken into account, as user may miss other issues which are currently evident with the normal logging level, eg. if the file is readonly for the spark user).

What do you think? Thanks.

@jerryshao
Copy link
Contributor

I don't think the problem you mentioned is a big problem.

  1. For the blacklist mechanism, we can have a time-based reviving mechanism to check if permission is changed, compared to check file permission for all the files, the cost would not be so high. Also as you mentioned, the permission is seldom changed, so it is fine without change.
  2. I don't think this is a problem, try-catch with proper log should be enough.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 1, 2018

thanks for comments @jerryshao . I will update this PR accordingly. Thanks.

@SparkQA
Copy link

SparkQA commented Aug 1, 2018

Test build #93881 has finished for PR 21895 at commit aec9b86.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 1, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Aug 1, 2018

Test build #93894 has finished for PR 21895 at commit aec9b86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
extends ApplicationHistoryProvider with Logging {
private[history] class FsHistoryProvider(conf: SparkConf, protected val clock: Clock)
extends ApplicationHistoryProvider with LogFilesBlacklisting with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the special advantage of using a mixin trait rather than directly changing the code here in FsHistoryProvider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to separate the blacklisting logic since FsHistoryProvider contains already a lot of code. So I just considered it more readable. If you prefer I can inline it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not so necessary, let's inline this trait.

try {
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
// We don't have read permissions on the log file
logDebug(s"Unable to read log $path", e.getCause)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to use warning log for the first time we met such issue, to notify user that some event logs cannot be read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do, thanks

protected def clearBlacklist(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
val expired = new mutable.ArrayBuffer[String]
blacklist.asScala.foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the iteration should be synchronized, but I think it is not a big deal here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is needed as a new collection is build when doing asScala so we work on a definite snapshot of the original map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, asScala doesn't copy and create a snapshot from original map, it just wraps the original map and provide Scala API. The change of original map will also affect the object after asScala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry, you are right, I got confused because I changed this line before pushing here and I was thinking to my previous implementation. Ye, we are not working on a definite snapshot of the values here. But I think anyway this shouldn't be a problem as we are not updating the values. We may miss to process new entries but this is not an issue I think. Thanks.

@jerryshao
Copy link
Contributor

@mridulm would you please also take a review. Thanks!

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93991 has finished for PR 21895 at commit c620fff.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 2, 2018

retest this pelase

@jerryshao
Copy link
Contributor

Jenkins, retest this please.

@jerryshao
Copy link
Contributor

Ping @mridulm , would you please also take a review, thanks!

@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94084 has finished for PR 21895 at commit c620fff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

Hi @mgaido91 would you please check it is auto-mergeable to branch 2.2/2.3, if not please also repare the fix for the related branch once this is merged.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 3, 2018

sure @jerryshao , will do. Thanks for the review.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me ! Left a couple of minor comments though.

@@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.delete(classOf[LogInfo], log.logPath)
}
}
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern is that, if there happens to be a transient acl issue when initially accessing the file, we will never see it in the application list even when acl is fixed : without a SHS restart.
Wondering if the clean interval here could be fraction of CLEAN_INTERVAL_S - so that these files have a chance of making it to app list : without much of an overhead on NN.

Copy link
Contributor Author

@mgaido91 mgaido91 Aug 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is scheduled anyway every CLEAN_INTERVAL_S. So I don't think that changing the value here helps. We may define another config for the blacklisting expiration, but this seems an overkill to me. I think it is very unlikely that a user changes application permissions on this files and when he does, he can always restart the SHS. Or we can also decide to clean the blacklist every fixed X amount of time. I don't have a strong opinion on which of these options is the best honestly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misread it as MAX_LOG_AGE_S ... CLEAN_INTERVAL_S should be fine here, you are right.

blacklist.asScala.foreach {
case (path, creationTime) if creationTime < expiredThreshold => expired += path
}
expired.foreach(blacklist.remove(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this, why not simply:

blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)

@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94111 has finished for PR 21895 at commit 0a48f9a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94127 has finished for PR 21895 at commit 14ae790.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 3, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94140 has finished for PR 21895 at commit 14ae790.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 3, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Aug 4, 2018

Test build #94171 has finished for PR 21895 at commit 14ae790.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 6, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Aug 6, 2018

Test build #94268 has finished for PR 21895 at commit 14ae790.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jerryshao jerryshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@mridulm do you have any further comment?

@asfgit asfgit closed this in 3c96937 Aug 6, 2018
@mridulm
Copy link
Contributor

mridulm commented Aug 6, 2018

I merged to master, thanks for the work @mgaido91 !

@jerryshao
Copy link
Contributor

This should also be backported to branch 2.2 and 2.3 @mridulm , this is a regression.

@mgaido91 would you please create backport PRs for the separate branches?

@mgaido91
Copy link
Contributor Author

mgaido91 commented Aug 7, 2018

Thanks for your help with this @mridulm @jerryshao .

Yes, sure, I am doing it, I'll ping you once they are ready, thanks.

mgaido91 added a commit to mgaido91/spark that referenced this pull request Aug 7, 2018
In `SparkHadoopUtil. checkAccessPermission`,  we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons.

modified UTs

Author: Marco Gaido <[email protected]>

Closes apache#21895 from mgaido91/SPARK-24948.
mgaido91 added a commit to mgaido91/spark that referenced this pull request Aug 7, 2018
In `SparkHadoopUtil. checkAccessPermission`,  we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons.

modified UTs

Author: Marco Gaido <[email protected]>

Closes apache#21895 from mgaido91/SPARK-24948.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants