Skip to content

Commit

Permalink
Handle slow security policies without blocking gRPC threads.
Browse files Browse the repository at this point in the history
- Introduce PendingAuthListener to handle a ListenableFuture<Status>, progressing the gRPC through each stage in sequence once the future completes and is OK.
- Move unit tests away from `checkAuthorizationForService` and into `checkAuthorizationForServiceAsync` since that should be the only method called in production now.

This should be the last PR to address grpc#10566.
  • Loading branch information
mateusazis committed Oct 26, 2023
1 parent b6947de commit 8dd2fc5
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public Server build() {
checkState(!isBuilt, "BinderServerBuilder can only be used to build one server instance.");
isBuilt = true;
// We install the security interceptor last, so it's closest to the transport.
BinderTransportSecurity.installAuthInterceptor(this);
BinderTransportSecurity.installAuthInterceptor(this, serverImplBuilder.getExecutorPool());
return super.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ObjectPool;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;

/**
* Manages security for an Android Service hosted gRPC server.
Expand All @@ -51,8 +53,8 @@ private BinderTransportSecurity() {}
* @param serverBuilder The ServerBuilder being used to create the server.
*/
@Internal
public static void installAuthInterceptor(ServerBuilder<?> serverBuilder) {
serverBuilder.intercept(new ServerAuthInterceptor());
public static void installAuthInterceptor(ServerBuilder<?> serverBuilder, ObjectPool<? extends Executor> executorPool) {
serverBuilder.intercept(new ServerAuthInterceptor(executorPool));
}

/**
Expand All @@ -78,30 +80,33 @@ public static void attachAuthAttrs(
* Authentication state is fetched from the call attributes, inherited from the transport.
*/
private static final class ServerAuthInterceptor implements ServerInterceptor {

private final ObjectPool<? extends Executor> executorPool;

ServerAuthInterceptor(ObjectPool<? extends Executor> executorPool) {
this.executorPool = executorPool;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Status authStatus =
ListenableFuture<Status> authStatusFuture =
call.getAttributes()
.get(TRANSPORT_AUTHORIZATION_STATE)
.checkAuthorization(call.getMethodDescriptor());
if (authStatus.isOk()) {
return next.startCall(call, headers);
} else {
call.close(authStatus, new Metadata());
return new ServerCall.Listener<ReqT>() {};
}

return new PendingAuthListener<>(authStatusFuture, executorPool, call, headers, next);
}
}

/**
* Maintaines the authorization state for a single transport instance. This class lives for the
* Maintains the authorization state for a single transport instance. This class lives for the
* lifetime of a single transport.
*/
private static final class TransportAuthorizationState {
private final int uid;
private final ServerPolicyChecker serverPolicyChecker;
private final ConcurrentHashMap<String, Status> serviceAuthorization;
private final ConcurrentHashMap<String, ListenableFuture<Status>> serviceAuthorization;

TransportAuthorizationState(int uid, ServerPolicyChecker serverPolicyChecker) {
this.uid = uid;
Expand All @@ -111,32 +116,20 @@ private static final class TransportAuthorizationState {

/** Get whether we're authorized to make this call. */
@CheckReturnValue
Status checkAuthorization(MethodDescriptor<?, ?> method) {
ListenableFuture<Status> checkAuthorization(MethodDescriptor<?, ?> method) {
String serviceName = method.getServiceName();
// Only cache decisions if the method can be sampled for tracing,
// which is true for all generated methods. Otherwise, programatically
// created methods could casue this cahe to grow unbounded.
// which is true for all generated methods. Otherwise, programmatically
// created methods could case this cache to grow unbounded.
boolean useCache = method.isSampledToLocalTracing();
Status authorization;
if (useCache) {
authorization = serviceAuthorization.get(serviceName);
@Nullable ListenableFuture<Status> authorization = serviceAuthorization.get(serviceName);
if (authorization != null) {
return authorization;
}
}
try {
// TODO(10566): provide a synchronous version of "checkAuthorization" to avoid blocking the
// calling thread on the completion of the future.
authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName).get();
} catch (ExecutionException e) {
// Do not cache this failure since it may be transient.
return Status.fromThrowable(e);
} catch (InterruptedException e) {
// Do not cache this failure since it may be transient.
Thread.currentThread().interrupt();
return Status.CANCELLED.withCause(e);
}
ListenableFuture<Status> authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
if (useCache) {
serviceAuthorization.putIfAbsent(serviceName, authorization);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.grpc.binder.internal;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.Status;
import io.grpc.internal.ObjectPool;

import java.util.concurrent.Executor;

/**
* A {@link ServerCall.Listener} that can be returned by a {@link io.grpc.ServerInterceptor} to
* asynchronously advance the gRPC pending resolving a possibly asynchronous security policy check.
*/
final class PendingAuthListener<ReqT, RespT> extends ServerCall.Listener<ReqT> {

private final ListenableFuture<Listener<ReqT>> authStatusFuture;
private final Executor sequentialExecutor;
private final ObjectPool<? extends Executor> executorPool;
private final Executor executor;

/**
* @param authStatusFuture a ListenableFuture holding the result status of the authorization
* policy from a {@link io.grpc.binder.SecurityPolicy} or a
* {@link io.grpc.binder.AsyncSecurityPolicy}. The call only progresses
* if {@link Status#isOk()} is true.
* @param executorPool a pool that can provide at least one Executor under which the result
* of {@code authStatusFuture} can be handled, progressing the gRPC
* stages.
* @param call the 'call' parameter from {@link io.grpc.ServerInterceptor}
* @param headers the 'headers' parameter from {@link io.grpc.ServerInterceptor}
* @param next the 'next' parameter from {@link io.grpc.ServerInterceptor}
*/
PendingAuthListener(
ListenableFuture<Status> authStatusFuture,
ObjectPool<? extends Executor> executorPool,
ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
this.executorPool = executorPool;
this.executor = executorPool.getObject();
this.authStatusFuture = Futures.transform(authStatusFuture, authStatus -> {
if (authStatus.isOk()) {
return next.startCall(call, headers);
}
call.close(authStatus, new Metadata());
throw new IllegalStateException("Auth failed", authStatus.asException());
}, executor);
this.sequentialExecutor = MoreExecutors.newSequentialExecutor(executor);
}

@Override
public void onCancel() {
ListenableFuture<?> unused = Futures.transform(authStatusFuture, delegate -> {
delegate.onCancel();
executorPool.returnObject(executor);
return null;
}, sequentialExecutor);
}

@Override
public void onComplete() {
ListenableFuture<?> unused = Futures.transform(authStatusFuture, delegate -> {
delegate.onComplete();
executorPool.returnObject(executor);
return null;
}, sequentialExecutor);
}

@Override
public void onHalfClose() {
ListenableFuture<?> unused = Futures.transform(authStatusFuture, delegate -> {
delegate.onHalfClose();
return null;
}, sequentialExecutor);
}

@Override
public void onMessage(ReqT message) {
ListenableFuture<?> unused = Futures.transform(authStatusFuture, delegate -> {
delegate.onMessage(message);
return null;
}, sequentialExecutor);
}

@Override
public void onReady() {
ListenableFuture<?> unused = Futures.transform(authStatusFuture, delegate -> {
delegate.onReady();
return null;
}, sequentialExecutor);
}
}
Loading

0 comments on commit 8dd2fc5

Please sign in to comment.