Skip to content

Commit

Permalink
gRPC: fix request context propagation - post review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
michalszynkiewicz committed Jun 18, 2021
1 parent c62fe3b commit 56e7d9c
Showing 1 changed file with 72 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

import org.jboss.logging.Logger;

import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.quarkus.arc.Arc;
import io.quarkus.arc.InjectableContext;
import io.quarkus.arc.ManagedContext;
Expand Down Expand Up @@ -38,90 +36,97 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
state = reqContext.getState();
} else {
state = null;
log.warn("Request context already active when gRPC request started");
}

ServerCall.Listener<ReqT> delegate = next
.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {

@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
if (state != null) {
capturedVertxContext.runOnContext(event -> {
reqContext.deactivate();
reqContext.destroy(state);
});
}
}
}, headers);

// a gRPC service can return a StreamObserver<Messages.StreamingInputCallRequest> and instead of doing the work
// directly in the method body, do stuff that requires a request context in StreamObserver's methods
// let's propagate the request context to these methods:
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {

@Override
public void onMessage(ReqT message) {
activateContext();
try {
super.onMessage(message);
} finally {
deactivateContext();
try {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(call, headers)) {

@Override
public void onMessage(ReqT message) {
boolean activated = activateContext();
try {
super.onMessage(message);
} finally {
if (activated) {
deactivateContext();
}
}
}
}

@Override
public void onReady() {
activateContext();
try {
super.onReady();
} finally {
deactivateContext();
@Override
public void onReady() {
boolean activated = activateContext();
try {
super.onReady();
} finally {
if (activated) {
deactivateContext();
}
}
}
}

@Override
public void onHalfClose() {
activateContext();
try {
super.onHalfClose();
} finally {
deactivateContext();
@Override
public void onHalfClose() {
boolean activated = activateContext();
try {
super.onHalfClose();
} finally {
if (activated) {
deactivateContext();
}
}
}
}

@Override
public void onCancel() {
activateContext();
try {
super.onHalfClose();
} finally {
deactivateContext();
@Override
public void onCancel() {
boolean activated = activateContext();
try {
super.onCancel();
} finally {
if (activated) {
deactivateContext();
}
if (state != null) {
reqContext.destroy(state);
}
}
}
}

@Override
public void onComplete() {
activateContext();
try {
super.onComplete();
} finally {
deactivateContext();
@Override
public void onComplete() {
boolean activated = activateContext();
try {
super.onComplete();
} finally {
if (activated) {
deactivateContext();
}
if (state != null) {
reqContext.destroy(state);
}
}
}
}

private void deactivateContext() {
if (state != null) {
private void deactivateContext() {
reqContext.deactivate();
}
}

private void activateContext() {
if (state != null && !reqContext.isActive()) {
reqContext.activate(state);
private boolean activateContext() {
if (state != null && !reqContext.isActive()) {
reqContext.activate(state);
return true;
}
return false;
}
}
};
};
} finally {
reqContext.deactivate();
}
} else {
log.warn("Unable to activate the request scope - interceptor not called on the Vert.x event loop");
return next.startCall(call, headers);
Expand Down

0 comments on commit 56e7d9c

Please sign in to comment.