Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle slow security policies without blocking gRPC threads. #10633

Merged
merged 11 commits into from
Dec 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracer
executorServicePool,
streamTracerFactories,
BinderInternal.createPolicyChecker(SecurityPolicies.serverInternalOnly()),
InboundParcelablePolicy.DEFAULT);
InboundParcelablePolicy.DEFAULT,
/* shutdownListener=*/ () -> {});

HostServices.configureService(addr,
HostServices.serviceParamsBuilder()
Expand Down
15 changes: 13 additions & 2 deletions binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@
import io.grpc.internal.ServerImplBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;

import java.io.Closeable;
import java.io.File;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

import javax.annotation.Nullable;

/**
* Builder for a server that services requests from an Android Service.
*/
Expand Down Expand Up @@ -72,6 +77,7 @@ public static BinderServerBuilder forPort(int port) {
private ServerSecurityPolicy securityPolicy;
private InboundParcelablePolicy inboundParcelablePolicy;
private boolean isBuilt;
@Nullable private BinderTransportSecurity.ShutdownListener shutdownListener = null;

private BinderServerBuilder(
AndroidComponentAddress listenAddress,
Expand All @@ -85,7 +91,9 @@ private BinderServerBuilder(
schedulerPool,
streamTracerFactories,
BinderInternal.createPolicyChecker(securityPolicy),
inboundParcelablePolicy);
inboundParcelablePolicy,
// 'shutdownListener' should have been set by build()
checkNotNull(shutdownListener));
BinderInternal.setIBinder(binderReceiver, server.getHostBinder());
return server;
});
Expand Down Expand Up @@ -171,7 +179,10 @@ public Server build() {
checkState(!isBuilt, "BinderServerBuilder can only be used to build one server instance.");
isBuilt = true;
// We install the security interceptor last, so it's closest to the transport.
BinderTransportSecurity.installAuthInterceptor(this);
ObjectPool<? extends Executor> executorPool = serverImplBuilder.getExecutorPool();
Executor executor = executorPool.getObject();
BinderTransportSecurity.installAuthInterceptor(this, executor);
shutdownListener = () -> executorPool.returnObject(executor);
return super.build();
}
}
12 changes: 11 additions & 1 deletion binder/src/main/java/io/grpc/binder/internal/BinderServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener;
import io.grpc.internal.SharedResourceHolder;

import java.io.Closeable;
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
import java.io.IOException;
import java.net.SocketAddress;
import java.util.List;
Expand All @@ -60,6 +62,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
private final LeakSafeOneWayBinder hostServiceBinder;
private final BinderTransportSecurity.ServerPolicyChecker serverPolicyChecker;
private final InboundParcelablePolicy inboundParcelablePolicy;
private final BinderTransportSecurity.ShutdownListener shutdownListener;
mateusazis marked this conversation as resolved.
Show resolved Hide resolved

@GuardedBy("this")
private ServerListener listener;
Expand All @@ -70,18 +73,24 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
@GuardedBy("this")
private boolean shutdown;

/**
* @param shutdownListener represents resources that should be cleaned up once the server shuts
* down.
*/
public BinderServer(
AndroidComponentAddress listenAddress,
ObjectPool<ScheduledExecutorService> executorServicePool,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
BinderTransportSecurity.ServerPolicyChecker serverPolicyChecker,
InboundParcelablePolicy inboundParcelablePolicy) {
InboundParcelablePolicy inboundParcelablePolicy,
BinderTransportSecurity.ShutdownListener shutdownListener) {
this.listenAddress = listenAddress;
this.executorServicePool = executorServicePool;
this.streamTracerFactories =
ImmutableList.copyOf(checkNotNull(streamTracerFactories, "streamTracerFactories"));
this.serverPolicyChecker = checkNotNull(serverPolicyChecker, "serverPolicyChecker");
this.inboundParcelablePolicy = inboundParcelablePolicy;
this.shutdownListener = shutdownListener;
hostServiceBinder = new LeakSafeOneWayBinder(this);
}

Expand Down Expand Up @@ -125,6 +134,7 @@ public synchronized void shutdown() {
hostServiceBinder.detach();
listener.serverShutdown();
executorService = executorServicePool.returnObject(executorService);
shutdownListener.onShutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package io.grpc.binder.internal;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

import io.grpc.Attributes;
import io.grpc.Internal;
import io.grpc.Metadata;
Expand All @@ -28,9 +31,12 @@
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;

/**
* Manages security for an Android Service hosted gRPC server.
Expand All @@ -49,10 +55,11 @@ private BinderTransportSecurity() {}
* Install a security policy on an about-to-be created server.
*
* @param serverBuilder The ServerBuilder being used to create the server.
* @param executor The executor in which the authorization result will be handled.
*/
@Internal
public static void installAuthInterceptor(ServerBuilder<?> serverBuilder) {
serverBuilder.intercept(new ServerAuthInterceptor());
public static void installAuthInterceptor(ServerBuilder<?> serverBuilder, Executor executor) {
serverBuilder.intercept(new ServerAuthInterceptor(executor));
}

/**
Expand All @@ -78,13 +85,68 @@ public static void attachAuthAttrs(
* Authentication state is fetched from the call attributes, inherited from the transport.
*/
private static final class ServerAuthInterceptor implements ServerInterceptor {

private final Executor executor;

ServerAuthInterceptor(Executor executor) {
this.executor = executor;
}

/**
* @param authStatusFuture a Future that is known to be complete, i.e.
* {@link ListenableFuture#isDone()} returns true.
*/
private <ReqT, RespT> ServerCall.Listener<ReqT> newServerCallListenerForPendingAuthResult(
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
ListenableFuture<Status> authStatusFuture,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
PendingAuthListener<ReqT, RespT> listener = new PendingAuthListener<>();
Futures.addCallback(
authStatusFuture,
new FutureCallback<Status>() {
@Override
public void onSuccess(Status authStatus) {
if (!authStatus.isOk()) {
call.close(authStatus, new Metadata());
return;
}

listener.startCall(call, headers, next);
}

@Override
public void onFailure(Throwable t) {
call.close(
Status.INTERNAL.withCause(t).withDescription("Authorization future failed"),
new Metadata());
}
}, executor);
return listener;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Status authStatus =
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
ListenableFuture<Status> authStatusFuture =
call.getAttributes()
.get(TRANSPORT_AUTHORIZATION_STATE)
.checkAuthorization(call.getMethodDescriptor());

// Most SecurityPolicy will have synchronous implementations that provide an
// immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations
// and asynchronous code if the authorization result is already present.
if (!authStatusFuture.isDone()) {
return newServerCallListenerForPendingAuthResult(authStatusFuture, call, headers, next);
}

Status authStatus;
try {
authStatus = Futures.getDone(authStatusFuture);
} catch (ExecutionException e) {
authStatus = Status.INTERNAL.withCause(e);
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
}

if (authStatus.isOk()) {
return next.startCall(call, headers);
} else {
Expand All @@ -95,13 +157,13 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
}

/**
* Maintaines the authorization state for a single transport instance. This class lives for the
* Maintains the authorization state for a single transport instance. This class lives for the
* lifetime of a single transport.
*/
private static final class TransportAuthorizationState {
private final int uid;
private final ServerPolicyChecker serverPolicyChecker;
private final ConcurrentHashMap<String, Status> serviceAuthorization;
private final ConcurrentHashMap<String, ListenableFuture<Status>> serviceAuthorization;

TransportAuthorizationState(int uid, ServerPolicyChecker serverPolicyChecker) {
this.uid = uid;
Expand All @@ -111,32 +173,27 @@ private static final class TransportAuthorizationState {

/** Get whether we're authorized to make this call. */
@CheckReturnValue
Status checkAuthorization(MethodDescriptor<?, ?> method) {
ListenableFuture<Status> checkAuthorization(MethodDescriptor<?, ?> method) {
String serviceName = method.getServiceName();
// Only cache decisions if the method can be sampled for tracing,
// which is true for all generated methods. Otherwise, programatically
// created methods could casue this cahe to grow unbounded.
// which is true for all generated methods. Otherwise, programmatically
// created methods could cause this cache to grow unbounded.
boolean useCache = method.isSampledToLocalTracing();
Status authorization;
if (useCache) {
authorization = serviceAuthorization.get(serviceName);
@Nullable ListenableFuture<Status> authorization = serviceAuthorization.get(serviceName);
if (authorization != null) {
return authorization;
}
}
try {
// TODO(10566): provide a synchronous version of "checkAuthorization" to avoid blocking the
// calling thread on the completion of the future.
authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName).get();
} catch (ExecutionException e) {
// Do not cache this failure since it may be transient.
return Status.fromThrowable(e);
} catch (InterruptedException e) {
// Do not cache this failure since it may be transient.
Thread.currentThread().interrupt();
return Status.CANCELLED.withCause(e);
}
// Under high load, this may trigger a large number of concurrent authorization checks that
// perform essentially the same work and have the potential of exhausting the resources they
// depend on. This was a non-issue in the past with synchronous policy checks due to the
// fixed-size nature of the thread pool this method runs under.
//
// TODO(10669): evaluate if there should be at most a single pending authorization check per
// (uid, serviceName) pair at any given time.
ListenableFuture<Status> authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
if (useCache) {
serviceAuthorization.putIfAbsent(serviceName, authorization);
}
Expand Down Expand Up @@ -167,4 +224,12 @@ public interface ServerPolicyChecker {
*/
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName);
}

/**
* A listener invoked when the {@link io.grpc.binder.internal.BinderServer} shuts down, allowing
* resources to be potentially cleaned up.
*/
public interface ShutdownListener {
void onShutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server shutdown is actually kind of complex, so if we're going to use the prefix "on" (which I agree makes sense in a listener), we probably want to clarify that this is the server shutdown, and rename this to onServerShutdown().

FWIW, there's actually a bit more nuance here we'll need to handle, but let's leave that for a follow up CL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

By "nuance", do you mean code style-wise or are we actually missing something from the implementation? Because if it's the later, I'm not aware of what that is.

}
}
102 changes: 102 additions & 0 deletions binder/src/main/java/io/grpc/binder/internal/PendingAuthListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.grpc.binder.internal;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.Status;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;

/**
* A {@link ServerCall.Listener} that can be returned by a {@link io.grpc.ServerInterceptor} to
* asynchronously advance the gRPC pending resolving a possibly asynchronous security policy check.
*/
final class PendingAuthListener<ReqT, RespT> extends ServerCall.Listener<ReqT> {

private final ConcurrentLinkedQueue<ListenerConsumer<ReqT>> pendingSteps =
new ConcurrentLinkedQueue<>();
private final AtomicReference<ServerCall.Listener<ReqT>> delegateRef =
new AtomicReference<>(null);

PendingAuthListener() {}

void startCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
ServerCall.Listener<ReqT> delegate;
try {
delegate = next.startCall(call, headers);
} catch (RuntimeException e) {
call.close(
Status
.INTERNAL
.withCause(e)
.withDescription("Failed to start server call after authorization check"),
new Metadata());
return;
}
delegateRef.set(delegate);
maybeRunPendingSteps();
}

/**
* Runs any enqueued step in this ServerCall listener as long as the authorization check is
* complete. Otherwise, no-op and returns immediately.
*/
private void maybeRunPendingSteps() {
@Nullable ServerCall.Listener<ReqT> delegate = delegateRef.get();
if (delegate == null) {
return;
}

// This section is synchronized so that no 2 threads may attempt to retrieve elements from the
// queue in order but end up executing the steps out of order.
synchronized (this) {
ListenerConsumer<ReqT> nextStep;
mateusazis marked this conversation as resolved.
Show resolved Hide resolved
while ((nextStep = pendingSteps.poll()) != null) {
nextStep.accept(delegate);
}
}
}

@Override
public void onCancel() {
pendingSteps.offer(ServerCall.Listener::onCancel);
maybeRunPendingSteps();
}

@Override
public void onComplete() {
pendingSteps.offer(ServerCall.Listener::onComplete);
maybeRunPendingSteps();
}

@Override
public void onHalfClose() {
pendingSteps.offer(ServerCall.Listener::onHalfClose);
maybeRunPendingSteps();
}

@Override
public void onMessage(ReqT message) {
pendingSteps.offer(delegate -> delegate.onMessage(message));
maybeRunPendingSteps();
}

@Override
public void onReady() {
pendingSteps.offer(ServerCall.Listener::onReady);
maybeRunPendingSteps();
}

/**
* Similar to Java8's {@link java.util.function.Consumer}, but redeclared in order to support
* Android SDK 21.
*/
private interface ListenerConsumer<ReqT> {
void accept(ServerCall.Listener<ReqT> listener);
}
}
Loading