Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CachingUsernamePassword realm #32646

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
*/
package org.elasticsearch.xpack.security.authc.support;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
Expand All @@ -30,17 +28,17 @@

public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm implements CachingRealm {

private final Cache<String, ListenableFuture<Tuple<AuthenticationResult, UserWithHash>>> cache;
private final Cache<String, ListenableFuture<UserWithHash>> cache;
private final ThreadPool threadPool;
final Hasher cacheHasher;

protected CachingUsernamePasswordRealm(String type, RealmConfig config, ThreadPool threadPool) {
super(type, config);
cacheHasher = Hasher.resolve(CachingUsernamePasswordRealmSettings.CACHE_HASH_ALGO_SETTING.get(config.settings()));
this.threadPool = threadPool;
TimeValue ttl = CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING.get(config.settings());
final TimeValue ttl = CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING.get(config.settings());
if (ttl.getNanos() > 0) {
cache = CacheBuilder.<String, ListenableFuture<Tuple<AuthenticationResult, UserWithHash>>>builder()
cache = CacheBuilder.<String, ListenableFuture<UserWithHash>>builder()
.setExpireAfterWrite(ttl)
.setMaximumWeight(CachingUsernamePasswordRealmSettings.CACHE_MAX_USERS_SETTING.get(config.settings()))
.build();
Expand All @@ -49,13 +47,15 @@ protected CachingUsernamePasswordRealm(String type, RealmConfig config, ThreadPo
}
}

@Override
public final void expire(String username) {
if (cache != null) {
logger.trace("invalidating cache for user [{}] in realm [{}]", username, name());
cache.invalidate(username);
}
}

@Override
public final void expireAll() {
if (cache != null) {
logger.trace("invalidating cache for all users in realm [{}]", name());
Expand All @@ -72,108 +72,77 @@ public final void expireAll() {
*/
@Override
public final void authenticate(AuthenticationToken authToken, ActionListener<AuthenticationResult> listener) {
UsernamePasswordToken token = (UsernamePasswordToken) authToken;
final UsernamePasswordToken token = (UsernamePasswordToken) authToken;
try {
if (cache == null) {
doAuthenticate(token, listener);
} else {
authenticateWithCache(token, listener);
}
} catch (Exception e) {
} catch (final Exception e) {
// each realm should handle exceptions, if we get one here it should be considered fatal
listener.onFailure(e);
}
}

private void authenticateWithCache(UsernamePasswordToken token, ActionListener<AuthenticationResult> listener) {
try {
final SetOnce<User> authenticatedUser = new SetOnce<>();
final AtomicBoolean createdAndStartedFuture = new AtomicBoolean(false);
final ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> future = cache.computeIfAbsent(token.principal(), k -> {
final ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> created = new ListenableFuture<>();
if (createdAndStartedFuture.compareAndSet(false, true) == false) {
throw new IllegalStateException("something else already started this. how?");
}
return created;
});

if (createdAndStartedFuture.get()) {
doAuthenticate(token, ActionListener.wrap(result -> {
if (result.isAuthenticated()) {
final User user = result.getUser();
authenticatedUser.set(user);
final UserWithHash userWithHash = new UserWithHash(user, token.credentials(), cacheHasher);
future.onResponse(new Tuple<>(result, userWithHash));
final AtomicBoolean cachedAuthentication = new AtomicBoolean(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename this to authenticationInCache? cachedAuthentication could mean a few different things to me such as creating the future and putting it in the cache

final ListenableFuture<UserWithHash> listenableCacheEntry = cache.computeIfAbsent(token.principal(), k -> {
final ListenableFuture<UserWithHash> created = new ListenableFuture<>();
// forward a new authenticate request to the external system
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nit but the authentication system may be internal too. So I'd update the comment to say something like attempt authentication against authentication source

doAuthenticate(token, ActionListener.wrap(authResult -> {
if (authResult.isAuthenticated() && authResult.getUser().enabled()) {
// compute the hash of the successful authn request
final UserWithHash userWithHash = new UserWithHash(authResult.getUser(), token.credentials(), cacheHasher);
// notify forestalled request listeners
created.onResponse(userWithHash);
} else {
future.onResponse(new Tuple<>(result, null));
// the inflight request has failed to authenticate
// clear cache, the next request should be forwarded, not halted by a failed
// authn attempt
cache.invalidate(token.principal(), created);
// notify forestalled request listeners
created.onResponse(null);
}
}, future::onFailure));
}

future.addListener(ActionListener.wrap(tuple -> {
if (tuple != null) {
final UserWithHash userWithHash = tuple.v2();
final boolean performedAuthentication = createdAndStartedFuture.get() && userWithHash != null &&
tuple.v2().user == authenticatedUser.get();
handleResult(future, createdAndStartedFuture.get(), performedAuthentication, token, tuple, listener);
} else {
handleFailure(future, createdAndStartedFuture.get(), token, new IllegalStateException("unknown error authenticating"),
listener);
}
}, e -> handleFailure(future, createdAndStartedFuture.get(), token, e, listener)),
threadPool.executor(ThreadPool.Names.GENERIC));
} catch (ExecutionException e) {
listener.onResponse(AuthenticationResult.unsuccessful("", e));
}
}

private void handleResult(ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> future, boolean createdAndStartedFuture,
boolean performedAuthentication, UsernamePasswordToken token,
Tuple<AuthenticationResult, UserWithHash> result, ActionListener<AuthenticationResult> listener) {
final AuthenticationResult authResult = result.v1();
if (authResult == null) {
// this was from a lookup; clear and redo
cache.invalidate(token.principal(), future);
authenticateWithCache(token, listener);
} else if (authResult.isAuthenticated()) {
if (performedAuthentication) {
listener.onResponse(authResult);
} else {
UserWithHash userWithHash = result.v2();
if (userWithHash.verify(token.credentials())) {
if (userWithHash.user.enabled()) {
User user = userWithHash.user;
logger.debug("realm [{}] authenticated user [{}], with roles [{}]",
name(), token.principal(), user.roles());
// notify the listener of the inflight authentication request
listener.onResponse(authResult);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the listener is not added to the future because we don't want to create a loop in the failure cases? If so, can you please document this aspect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the listener is not added to the future because we don't want to create a loop in the failure cases?

Yes, the if (authenticationInCache) only adds a listener if this is not the inflight request. The listener retries the request if passwords don't match or the authn failed (when the inflight request returns). The inflight request's result, however, is definitive, it will not be retried, it has reached to the "source of truth" and if it has failed, there is no point in retrying. This strategy, of not retrying requests if they have reached to the source of truth, has the consequence of avoiding the loop in the failure case; given a set of requests that have to be retried, at least one will be handled in the next loop (and not retried anymore) - the one that had reached to the source of authentication.

This has not changed in this refactoring.
I have added comments about when retries happen (and don't happen). I hope it is clearer now.

}, e -> {
// the next request should be forwarded, not halted by a failed authn attempt
cache.invalidate(token.principal(), created);
// notify staved off listeners
created.onFailure(e);
// notify the listener of the inflight authentication request
listener.onFailure(e);
}));
cachedAuthentication.set(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should move this to before the call to doAuthenticate. Even though doAuthenticate is asynchronous there is no requirement that another thread is forked.

Copy link
Contributor Author

@albertzaharovits albertzaharovits Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes any difference if setting this flag is before or after the doAuthenticate.
It is not used in the doAuthenticate listeners.
I have moved it nonetheless because it is closer to where the flag is declared and it is probably easier to reason about it.

return created;
});
if (cachedAuthentication.get()) {
// there is a cached or an inflight authenticate request
listenableCacheEntry.addListener(ActionListener.wrap(authenticatedUserWithHash -> {
if (null == authenticatedUserWithHash) {
// inflight request has failed, try again to reach for the external system
authenticateWithCache(token, listener);
} else if (authenticatedUserWithHash.verify(token.credentials())) {
// cached hash matches the credentials for this forestalled request
final User user = authenticatedUserWithHash.user;
logger.debug("realm [{}] authenticated user [{}], with roles [{}]", name(), token.principal(), user.roles());
listener.onResponse(AuthenticationResult.success(user));
} else {
// re-auth to see if user has been enabled
cache.invalidate(token.principal(), future);
// cached hash does not match the credentials for this forestalled request.
// however, we should clear cache and try to reach the external system again
// because password might have changed and the cached hash got stale
cache.invalidate(token.principal(), listenableCacheEntry);
authenticateWithCache(token, listener);
}
} else {
// could be a password change?
cache.invalidate(token.principal(), future);
}, e -> {
// try again, the inflight request failed
authenticateWithCache(token, listener);
}
}
} else {
cache.invalidate(token.principal(), future);
if (createdAndStartedFuture) {
listener.onResponse(authResult);
} else {
authenticateWithCache(token, listener);
}), threadPool.executor(ThreadPool.Names.GENERIC));
}
}
}

private void handleFailure(ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> future, boolean createdAndStarted,
UsernamePasswordToken token, Exception e, ActionListener<AuthenticationResult> listener) {
cache.invalidate(token.principal(), future);
if (createdAndStarted) {
} catch (final ExecutionException e) {
listener.onFailure(e);
} else {
authenticateWithCache(token, listener);
}
}

Expand All @@ -193,38 +162,54 @@ protected int getCacheSize() {

@Override
public final void lookupUser(String username, ActionListener<User> listener) {
if (cache != null) {
try {
ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> future = cache.computeIfAbsent(username, key -> {
ListenableFuture<Tuple<AuthenticationResult, UserWithHash>> created = new ListenableFuture<>();
doLookupUser(username, ActionListener.wrap(user -> {
if (user != null) {
UserWithHash userWithHash = new UserWithHash(user, null, null);
created.onResponse(new Tuple<>(null, userWithHash));
} else {
created.onResponse(new Tuple<>(null, null));
}
}, created::onFailure));
return created;
});

future.addListener(ActionListener.wrap(tuple -> {
if (tuple != null) {
if (tuple.v2() == null) {
cache.invalidate(username, future);
listener.onResponse(null);
} else {
listener.onResponse(tuple.v2().user);
}
try {
if (cache == null) {
doLookupUser(username, listener);
} else {
lookupWithCache(username, listener);
}
} catch (final Exception e) {
// each realm should handle exceptions, if we get one here it should be
// considered fatal
listener.onFailure(e);
}
}

private void lookupWithCache(String username, ActionListener<User> listener) {
try {
final ListenableFuture<UserWithHash> listenableCacheEntry = cache.computeIfAbsent(username, key -> {
final ListenableFuture<UserWithHash> created = new ListenableFuture<>();
// forward a new lookup request to the external system
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about external system changing to authentication source.

doLookupUser(username, ActionListener.wrap(user -> {
if (user != null) {
// user found
final UserWithHash userWithHash = new UserWithHash(user, null, null);
// notify forestalled request listeners
created.onResponse(userWithHash);
} else {
listener.onResponse(null);
// user not found, invalidate cache so that subsequent requests are forwarded to
// the external system
cache.invalidate(username, created);
// notify forestalled request listeners
created.onResponse(null);
}
}, listener::onFailure), threadPool.executor(ThreadPool.Names.GENERIC));
} catch (ExecutionException e) {
listener.onFailure(e);
}
} else {
doLookupUser(username, listener);
}, e -> {
// the next request should be forwarded, not halted by a failed lookup attempt
cache.invalidate(username, created);
// notify forestalled listeners
created.onFailure(e);
}));
return created;
});
listenableCacheEntry.addListener(ActionListener.wrap(userWithHash -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in an else block IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, for lookup there is, and there was, no retrying; the listener for the the reaching-out-request is notified just like all the other listeners.
When a lookup returns negatively, it clears the cache, but deferred requests will not be retried, they return negatively as well.

if (userWithHash != null) {
listener.onResponse(userWithHash.user);
} else {
listener.onResponse(null);
}
}, listener::onFailure), threadPool.executor(ThreadPool.Names.GENERIC));
} catch (final ExecutionException e) {
listener.onFailure(e);
}
}

Expand Down