Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle slow security policies without blocking gRPC threads. #10633

Merged
merged 11 commits into from
Dec 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public Server build() {
checkState(!isBuilt, "BinderServerBuilder can only be used to build one server instance.");
isBuilt = true;
// We install the security interceptor last, so it's closest to the transport.
BinderTransportSecurity.installAuthInterceptor(this);
BinderTransportSecurity.installAuthInterceptor(this, serverImplBuilder.getExecutorPool());
return super.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package io.grpc.binder.internal;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

import io.grpc.Attributes;
import io.grpc.Internal;
import io.grpc.Metadata;
Expand All @@ -28,9 +30,13 @@
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ObjectPool;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;

/**
* Manages security for an Android Service hosted gRPC server.
Expand All @@ -51,8 +57,10 @@ private BinderTransportSecurity() {}
* @param serverBuilder The ServerBuilder being used to create the server.
*/
@Internal
public static void installAuthInterceptor(ServerBuilder<?> serverBuilder) {
serverBuilder.intercept(new ServerAuthInterceptor());
public static void installAuthInterceptor(
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
ServerBuilder<?> serverBuilder,
ObjectPool<? extends Executor> executorPool) {
serverBuilder.intercept(new ServerAuthInterceptor(executorPool));
}

/**
Expand All @@ -78,30 +86,64 @@ public static void attachAuthAttrs(
* Authentication state is fetched from the call attributes, inherited from the transport.
*/
private static final class ServerAuthInterceptor implements ServerInterceptor {

private final ObjectPool<? extends Executor> executorPool;

ServerAuthInterceptor(ObjectPool<? extends Executor> executorPool) {
this.executorPool = executorPool;
}

/**
* @param authStatusFuture a Future that is known to be complete, i.e.
* {@link ListenableFuture#isDone()} returns true.
*/
private <ReqT, RespT> ServerCall.Listener<ReqT> newServerCallListenerForDoneAuthResult(
ListenableFuture<Status> authStatusFuture,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
Status authStatus;
try {
authStatus = Futures.getDone(authStatusFuture);
} catch (ExecutionException e) {
authStatus = Status.INTERNAL.withCause(e);
}

if (authStatus.isOk()) {
return next.startCall(call, headers);
}
call.close(authStatus, new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Status authStatus =
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
ListenableFuture<Status> authStatusFuture =
call.getAttributes()
.get(TRANSPORT_AUTHORIZATION_STATE)
.checkAuthorization(call.getMethodDescriptor());
if (authStatus.isOk()) {
return next.startCall(call, headers);
} else {
call.close(authStatus, new Metadata());
return new ServerCall.Listener<ReqT>() {};

// Most SecurityPolicy will have synchronous implementations that provide an
// immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations
// and asynchronous code.
if (authStatusFuture.isDone()) {
return newServerCallListenerForDoneAuthResult(authStatusFuture, call, headers, next);
}

return PendingAuthListener.create(authStatusFuture, executorPool, call, headers, next);
}
}

/**
* Maintaines the authorization state for a single transport instance. This class lives for the
* Maintains the authorization state for a single transport instance. This class lives for the
* lifetime of a single transport.
*/
private static final class TransportAuthorizationState {
private final int uid;
private final ServerPolicyChecker serverPolicyChecker;
private final ConcurrentHashMap<String, Status> serviceAuthorization;
private final ConcurrentHashMap<String, ListenableFuture<Status>> serviceAuthorization;

TransportAuthorizationState(int uid, ServerPolicyChecker serverPolicyChecker) {
this.uid = uid;
Expand All @@ -111,32 +153,27 @@ private static final class TransportAuthorizationState {

/** Get whether we're authorized to make this call. */
@CheckReturnValue
Status checkAuthorization(MethodDescriptor<?, ?> method) {
ListenableFuture<Status> checkAuthorization(MethodDescriptor<?, ?> method) {
String serviceName = method.getServiceName();
// Only cache decisions if the method can be sampled for tracing,
// which is true for all generated methods. Otherwise, programatically
// created methods could casue this cahe to grow unbounded.
// which is true for all generated methods. Otherwise, programmatically
// created methods could cause this cache to grow unbounded.
boolean useCache = method.isSampledToLocalTracing();
Status authorization;
if (useCache) {
authorization = serviceAuthorization.get(serviceName);
@Nullable ListenableFuture<Status> authorization = serviceAuthorization.get(serviceName);
if (authorization != null) {
return authorization;
}
}
try {
// TODO(10566): provide a synchronous version of "checkAuthorization" to avoid blocking the
// calling thread on the completion of the future.
authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName).get();
} catch (ExecutionException e) {
// Do not cache this failure since it may be transient.
return Status.fromThrowable(e);
} catch (InterruptedException e) {
// Do not cache this failure since it may be transient.
Thread.currentThread().interrupt();
return Status.CANCELLED.withCause(e);
}
// Under high load, this may trigger a large number of concurrent authorization checks that
// perform essentially the same work and have the potential of exhausting the resources they
// depend on. This was a non-issue in the past with synchronous policy checks due to the
// fixed-size nature of the thread pool this method runs under.
//
// TODO(10669): evaluate if there should be at most a single pending authorization check per
// (uid, serviceName) pair at any given time.
ListenableFuture<Status> authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
if (useCache) {
serviceAuthorization.putIfAbsent(serviceName, authorization);
}
Expand Down
156 changes: 156 additions & 0 deletions binder/src/main/java/io/grpc/binder/internal/PendingAuthListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package io.grpc.binder.internal;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.Status;
import io.grpc.internal.ObjectPool;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;

/**
* A {@link ServerCall.Listener} that can be returned by a {@link io.grpc.ServerInterceptor} to
* asynchronously advance the gRPC pending resolving a possibly asynchronous security policy check.
*/
final class PendingAuthListener<ReqT, RespT> extends ServerCall.Listener<ReqT> {

private final ConcurrentLinkedQueue<ListenerConsumer<ReqT>> pendingSteps =
new ConcurrentLinkedQueue<>();
private final AtomicReference<ServerCall.Listener<ReqT>> delegateRef =
new AtomicReference<>(null);

/**
* @param authStatusFuture a ListenableFuture holding the result status of the authorization
* policy from a {@link io.grpc.binder.SecurityPolicy} or a
* {@link io.grpc.binder.AsyncSecurityPolicy}. The call only progresses
* if {@link Status#isOk()} is true.
* @param executorPool a pool that can provide at least one Executor under which the result
* of {@code authStatusFuture} can be handled, progressing the gRPC
* stages.
* @param call the 'call' parameter from {@link io.grpc.ServerInterceptor}
* @param headers the 'headers' parameter from {@link io.grpc.ServerInterceptor}
* @param next the 'next' parameter from {@link io.grpc.ServerInterceptor}
*/
static <ReqT, RespT> PendingAuthListener<ReqT, RespT> create(
ListenableFuture<Status> authStatusFuture,
ObjectPool<? extends Executor> executorPool,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
PendingAuthListener<ReqT, RespT> listener = new PendingAuthListener<>();
Executor executor = executorPool.getObject();
Futures.addCallback(
authStatusFuture,
new FutureCallback<Status>() {
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onSuccess(Status authStatus) {
try {
if (!authStatus.isOk()) {
call.close(authStatus, new Metadata());
return;
}

listener.startCall(call, headers, next);
} finally {
executorPool.returnObject(executor);
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public void onFailure(Throwable t) {
call.close(
Status.INTERNAL.withCause(t).withDescription("Authorization future failed"),
new Metadata());
executorPool.returnObject(executor);
}
}, executor);
return listener;
}

private PendingAuthListener() {}

private void startCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
ServerCall.Listener<ReqT> delegate;
try {
delegate = next.startCall(call, headers);
} catch (RuntimeException e) {
call.close(
Status
.INTERNAL
.withCause(e)
.withDescription("Failed to start server call after authorization check"),
new Metadata());
return;
}
delegateRef.set(delegate);
maybeRunPendingSteps();
}

/**
* Runs any enqueued step in this ServerCall listener as long as the authorization check is
* complete. Otherwise, no-op and returns immediately.
*/
private void maybeRunPendingSteps() {
@Nullable ServerCall.Listener<ReqT> delegate = delegateRef.get();
if (delegate == null) {
return;
}

// This section is synchronized so that no 2 threads may attempt to retrieve elements from the
// queue in order but end up executing the steps out of order.
synchronized (this) {
ListenerConsumer<ReqT> nextStep;
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
while ((nextStep = pendingSteps.poll()) != null) {
nextStep.accept(delegate);
}
}
}

@Override
public void onCancel() {
pendingSteps.offer(ServerCall.Listener::onCancel);
maybeRunPendingSteps();
}

@Override
public void onComplete() {
pendingSteps.offer(ServerCall.Listener::onComplete);
maybeRunPendingSteps();
}

@Override
public void onHalfClose() {
pendingSteps.offer(ServerCall.Listener::onHalfClose);
maybeRunPendingSteps();
}

@Override
public void onMessage(ReqT message) {
pendingSteps.offer(delegate -> delegate.onMessage(message));
maybeRunPendingSteps();
}

@Override
public void onReady() {
pendingSteps.offer(ServerCall.Listener::onReady);
maybeRunPendingSteps();
}

/**
* Similar to Java8's {@link java.util.function.Consumer}, but redeclared in order to support
* Android SDK 21.
*/
private interface ListenerConsumer<ReqT> {
void accept(ServerCall.Listener<ReqT> listener);
}
}
Loading