Skip to content

Commit

Permalink
Using future callback to remove cancelled/failed auth future from map.
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusazis committed Dec 20, 2023
1 parent 5926b7e commit b50d5a7
Showing 1 changed file with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;

import io.grpc.Attributes;
import io.grpc.Internal;
Expand All @@ -32,12 +33,10 @@
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;

import java.util.HashMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -106,14 +105,14 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
// Most SecurityPolicy will have synchronous implementations that provide an
// immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations
// and asynchronous code if the authorization result is already present.
if (!authStatusFuture.isDone() || authStatusFuture.isCancelled()) {
if (!authStatusFuture.isDone()) {
return newServerCallListenerForPendingAuthResult(authStatusFuture, call, headers, next);
}

Status authStatus;
try {
authStatus = Futures.getDone(authStatusFuture);
} catch (ExecutionException e) {
} catch (ExecutionException | CancellationException e) {
// Failed futures are treated as an internal error rather than a security rejection.
authStatus = Status.INTERNAL.withCause(e);
}
Expand Down Expand Up @@ -163,12 +162,12 @@ public void onFailure(Throwable t) {
private static final class TransportAuthorizationState {
private final int uid;
private final ServerPolicyChecker serverPolicyChecker;
private final ConcurrentHashMap<String, ListenableFuture<Status>> serviceAuthorization;
private final HashMap<String, ListenableFuture<Status>> serviceAuthorization;

TransportAuthorizationState(int uid, ServerPolicyChecker serverPolicyChecker) {
this.uid = uid;
this.serverPolicyChecker = serverPolicyChecker;
serviceAuthorization = new ConcurrentHashMap<>(8);
serviceAuthorization = new HashMap<>(8);
}

/** Get whether we're authorized to make this call. */
Expand All @@ -179,40 +178,40 @@ ListenableFuture<Status> checkAuthorization(MethodDescriptor<?, ?> method) {
// which is true for all generated methods. Otherwise, programmatically
// created methods could cause this cache to grow unbounded.
boolean useCache = method.isSampledToLocalTracing();
if (useCache) {
@Nullable ListenableFuture<Status> authorization = serviceAuthorization.get(serviceName);
if (authorization != null && !isFailedOrCancelled(authorization)) {
// Authorization check exists and is a pending or successful future (even if for a failed
// authorization).
return authorization;

@Nullable ListenableFuture<Status> authorization;
synchronized (serviceAuthorization) {
if (useCache) {
authorization = serviceAuthorization.get(serviceName);
if (authorization != null) {
// Authorization check exists and is a pending or successful future (even if for a
// failed authorization).
return authorization;
}
}


// Under high load, this may trigger a large number of concurrent authorization checks that
// perform essentially the same work and have the potential of exhausting the resources they
// depend on. This was a non-issue in the past with synchronous policy checks due to the
// fixed-size nature of the thread pool this method runs under.
//
// TODO(10669): evaluate if there should be at most a single pending authorization check per
// (uid, serviceName) pair at any given time.
authorization = serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
if (useCache) {
serviceAuthorization.put(serviceName, authorization);
}
serviceAuthorization.remove(serviceName);
}
// Under high load, this may trigger a large number of concurrent authorization checks that
// perform essentially the same work and have the potential of exhausting the resources they
// depend on. This was a non-issue in the past with synchronous policy checks due to the
// fixed-size nature of the thread pool this method runs under.
//
// TODO(10669): evaluate if there should be at most a single pending authorization check per
// (uid, serviceName) pair at any given time.
ListenableFuture<Status> authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
if (useCache) {
serviceAuthorization.putIfAbsent(serviceName, authorization);
}
return authorization;
}
}

private static <T> boolean isFailedOrCancelled(Future<T> doneFuture) {
if (!doneFuture.isDone()) {
return false;
}
try {
T unused = Futures.getDone(doneFuture);
return false;
} catch (ExecutionException | CancellationException e) {
return true;
final ListenableFuture<Status> finalAuthorization = authorization;

return Futures.catching(authorization, Throwable.class, t -> {
synchronized (serviceAuthorization) {
serviceAuthorization.remove(serviceName, finalAuthorization);
}
throw new IllegalStateException(t);
}, MoreExecutors.directExecutor());
}
}

Expand Down

0 comments on commit b50d5a7

Please sign in to comment.