-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24948][SHS] Delegate check access permissions to the file system #21895
Conversation
@jerryshao @vanzin may you please take a look at this? Thanks. |
Test build #93673 has finished for PR 21895 at commit
|
Test build #93676 has finished for PR 21895 at commit
|
retest this please |
Test build #93719 has finished for PR 21895 at commit
|
cc @mridulm too |
* Cache containing the result for the already checked files. | ||
*/ | ||
// Visible for testing. | ||
private[history] val cache = new mutable.HashMap[String, Boolean] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For long running history server in busy clusters (particularly where spark.history.fs.cleaner.maxAge
is configured to be low), this Map will cause OOM.
Either an LRU cache or a disk backed map with periodic cleanup (based on maxAge) might be better ?
+CC @jerryshao |
Test build #93726 has finished for PR 21895 at commit
|
Test build #93735 has finished for PR 21895 at commit
|
@@ -973,6 +978,42 @@ private[history] object FsHistoryProvider { | |||
private[history] val CURRENT_LISTING_VERSION = 1L | |||
} | |||
|
|||
private[history] trait CachedFileSystemHelper extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, my main concern is about cache inconsistency if user changed the file permission during cache valid time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true, but the only way to avoid this issue is to call fs.access every time,which may cause huge performance issues. Moreover,I think it is also very unlikely that a user manually changes permission of the event logs of an application and restarting the SHS in such a scenario would solve the problem. In the current state, even though the file is accessible, it is ignored and the user has no workaround other than changing ownership or permissions to all files,despite the user running SHS can read the files (moreover it is a regression for these users)...
Anyway if you have a better suggestion I am more than happy to follow it.
// Visible for testing. | ||
private[history] val cache = CacheBuilder.newBuilder() | ||
.expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS) | ||
.build[String, java.lang.Boolean]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the real world, there will be many event logs under the folder, this will lead to memory increase indefinitely and potentially lead to OOM. We have seen that customer has more than 100K event logs in this folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory doesn't increase indefinitely as entries expire over the time. Moreover, as here we are storing a string containing only the name of the file and a Boolean, each entry is going to need about 100bytes in memory. With 100k event logs,this means about 10MB, which doesn't seem to me a value which can cause an OOM. Anyway, we can also add a maximum number of entries for this cache if you think it is necessary. This would cause some more RPC calls though.
My current thinking is to revert SPARK-20172 and improve the logging when exception is met during the actual read. Also if the file cannot be read for the first time, adding them to blacklist to avoid read again. |
@jerryshao thansk for your reply and suggestion. That can be done but I see mainly 2 problems IIUC:
What do you think? Thanks. |
I don't think the problem you mentioned is a big problem.
|
thanks for comments @jerryshao . I will update this PR accordingly. Thanks. |
Test build #93881 has finished for PR 21895 at commit
|
retest this please |
Test build #93894 has finished for PR 21895 at commit
|
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | ||
extends ApplicationHistoryProvider with Logging { | ||
private[history] class FsHistoryProvider(conf: SparkConf, protected val clock: Clock) | ||
extends ApplicationHistoryProvider with LogFilesBlacklisting with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the special advantage of using a mixin trait rather than directly changing the code here in FsHistoryProvider
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just wanted to separate the blacklisting logic since FsHistoryProvider
contains already a lot of code. So I just considered it more readable. If you prefer I can inline it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems not so necessary, let's inline this trait.
try { | ||
task.get() | ||
} catch { | ||
case e: InterruptedException => | ||
throw e | ||
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] => | ||
// We don't have read permissions on the log file | ||
logDebug(s"Unable to read log $path", e.getCause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to use warning log for the first time we met such issue, to notify user that some event logs cannot be read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do, thanks
protected def clearBlacklist(expireTimeInSeconds: Long): Unit = { | ||
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000 | ||
val expired = new mutable.ArrayBuffer[String] | ||
blacklist.asScala.foreach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally the iteration should be synchronized, but I think it is not a big deal here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is needed as a new collection is build when doing asScala so we work on a definite snapshot of the original map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, asScala
doesn't copy and create a snapshot from original map, it just wraps the original map and provide Scala API. The change of original map will also affect the object after asScala
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sorry, you are right, I got confused because I changed this line before pushing here and I was thinking to my previous implementation. Ye, we are not working on a definite snapshot of the values here. But I think anyway this shouldn't be a problem as we are not updating the values. We may miss to process new entries but this is not an issue I think. Thanks.
@mridulm would you please also take a review. Thanks! |
Test build #93991 has finished for PR 21895 at commit
|
retest this pelase |
Jenkins, retest this please. |
Ping @mridulm , would you please also take a review, thanks! |
Test build #94084 has finished for PR 21895 at commit
|
Hi @mgaido91 would you please check it is auto-mergeable to branch 2.2/2.3, if not please also repare the fix for the related branch once this is merged. |
sure @jerryshao , will do. Thanks for the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me ! Left a couple of minor comments though.
@@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |||
listing.delete(classOf[LogInfo], log.logPath) | |||
} | |||
} | |||
// Clean the blacklist from the expired entries. | |||
clearBlacklist(CLEAN_INTERVAL_S) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern is that, if there happens to be a transient acl issue when initially accessing the file, we will never see it in the application list even when acl is fixed : without a SHS restart.
Wondering if the clean interval here could be fraction of CLEAN_INTERVAL_S - so that these files have a chance of making it to app list : without much of an overhead on NN.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is scheduled anyway every CLEAN_INTERVAL_S. So I don't think that changing the value here helps. We may define another config for the blacklisting expiration, but this seems an overkill to me. I think it is very unlikely that a user changes application permissions on this files and when he does, he can always restart the SHS. Or we can also decide to clean the blacklist every fixed X amount of time. I don't have a strong opinion on which of these options is the best honestly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misread it as MAX_LOG_AGE_S ... CLEAN_INTERVAL_S should be fine here, you are right.
blacklist.asScala.foreach { | ||
case (path, creationTime) if creationTime < expiredThreshold => expired += path | ||
} | ||
expired.foreach(blacklist.remove(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this, why not simply:
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
Test build #94111 has finished for PR 21895 at commit
|
Test build #94127 has finished for PR 21895 at commit
|
retest this please |
Test build #94140 has finished for PR 21895 at commit
|
retest this please |
Test build #94171 has finished for PR 21895 at commit
|
retest this please |
Test build #94268 has finished for PR 21895 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@mridulm do you have any further comment?
I merged to master, thanks for the work @mgaido91 ! |
Thanks for your help with this @mridulm @jerryshao . Yes, sure, I am doing it, I'll ping you once they are ready, thanks. |
In `SparkHadoopUtil. checkAccessPermission`, we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can). The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons. modified UTs Author: Marco Gaido <[email protected]> Closes apache#21895 from mgaido91/SPARK-24948.
In `SparkHadoopUtil. checkAccessPermission`, we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can). The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons. modified UTs Author: Marco Gaido <[email protected]> Closes apache#21895 from mgaido91/SPARK-24948.
What changes were proposed in this pull request?
In
SparkHadoopUtil. checkAccessPermission
, we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can).The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons.
How was this patch tested?
modified UTs