-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for duplicated context in the gRPC clients #23731
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,12 @@ | |
import io.quarkus.grpc.GrpcClient; | ||
import io.quarkus.grpc.server.services.HelloService; | ||
import io.quarkus.test.QuarkusUnitTest; | ||
import io.smallrye.mutiny.Uni; | ||
import io.vertx.core.impl.ContextInternal; | ||
import io.vertx.core.impl.EventLoopContext; | ||
import io.vertx.core.impl.WorkerContext; | ||
import io.vertx.mutiny.core.Context; | ||
import io.vertx.mutiny.core.Vertx; | ||
|
||
public class MutinyClientInjectionTest { | ||
|
||
|
@@ -35,6 +41,12 @@ public class MutinyClientInjectionTest { | |
public void test() { | ||
String neo = service.invoke("neo-mutiny"); | ||
assertThat(neo).matches("Hello neo-mutiny"); | ||
|
||
String fromIO = service.invokeFromIoThread("neo-io"); | ||
assertThat(fromIO).matches("Hello neo-io"); | ||
|
||
String fromDC = service.invokeFromDuplicatedContext("neo-dc"); | ||
assertThat(fromDC).matches("Hello neo-dc"); | ||
} | ||
|
||
@ApplicationScoped | ||
|
@@ -43,11 +55,43 @@ static class MyConsumer { | |
@GrpcClient("hello-service") | ||
Greeter service; | ||
|
||
@Inject | ||
Vertx vertx; | ||
|
||
public String invoke(String s) { | ||
return service.sayHello(HelloRequest.newBuilder().setName(s).build()) | ||
.map(HelloReply::getMessage) | ||
.invoke(() -> assertThat(Vertx.currentContext()).isNull()) | ||
.await().atMost(Duration.ofSeconds(5)); | ||
} | ||
|
||
public String invokeFromIoThread(String s) { | ||
Context context = vertx.getOrCreateContext(); | ||
return Uni.createFrom().<String> emitter(e -> { | ||
context.runOnContext(() -> { | ||
service.sayHello(HelloRequest.newBuilder().setName(s).build()) | ||
.map(HelloReply::getMessage) | ||
.invoke(() -> assertThat(Vertx.currentContext()).isNotNull().isEqualTo(context)) | ||
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(EventLoopContext.class)) | ||
.subscribe().with(e::complete, e::fail); | ||
}); | ||
}).await().atMost(Duration.ofSeconds(5)); | ||
} | ||
|
||
public String invokeFromDuplicatedContext(String s) { | ||
Context root = vertx.getOrCreateContext(); | ||
ContextInternal duplicate = ((ContextInternal) root.getDelegate()).duplicate(); | ||
return Uni.createFrom().<String> emitter(e -> { | ||
duplicate.runOnContext(x -> { | ||
service.sayHello(HelloRequest.newBuilder().setName(s).build()) | ||
.map(HelloReply::getMessage) | ||
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()) | ||
.isNotInstanceOf(EventLoopContext.class).isNotInstanceOf(WorkerContext.class) | ||
.isEqualTo(duplicate)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's where we verify we are called on the same duplicated context |
||
.subscribe().with(e::complete, e::fail); | ||
}); | ||
}).await().atMost(Duration.ofSeconds(5)); | ||
} | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,18 +27,37 @@ public class IOThreadClientInterceptor implements ClientInterceptor, Prioritized | |
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, | ||
CallOptions callOptions, Channel next) { | ||
|
||
boolean isOnEventLoop = Context.isOnEventLoopThread(); | ||
Context context = Vertx.currentContext(); | ||
|
||
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { | ||
|
||
@Override | ||
public void start(Listener<RespT> responseListener, Metadata headers) { | ||
|
||
Context context = Vertx.currentContext(); | ||
boolean isOnIOThread = context != null && Context.isOnEventLoopThread(); | ||
|
||
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) { | ||
|
||
@Override | ||
public void onReady() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was under impression that we want to invoke client stuff always with duplicated context but we stick to only propagating the context, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
if (isOnIOThread) { | ||
context.runOnContext(unused -> super.onReady()); | ||
} else { | ||
super.onReady(); | ||
} | ||
} | ||
|
||
@Override | ||
public void onHeaders(Metadata headers) { | ||
if (isOnIOThread) { | ||
context.runOnContext(unused -> super.onHeaders(headers)); | ||
} else { | ||
super.onHeaders(headers); | ||
} | ||
} | ||
|
||
@Override | ||
public void onMessage(RespT message) { | ||
if (isOnEventLoop && context != null) { | ||
if (isOnIOThread) { | ||
context.runOnContext(unused -> super.onMessage(message)); | ||
} else { | ||
super.onMessage(message); | ||
|
@@ -47,7 +66,7 @@ public void onMessage(RespT message) { | |
|
||
@Override | ||
public void onClose(Status status, Metadata trailers) { | ||
if (isOnEventLoop && context != null) { | ||
if (isOnIOThread) { | ||
context.runOnContext(unused -> super.onClose(status, trailers)); | ||
} else { | ||
super.onClose(status, trailers); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to also test if client interceptors are invoked with the same duplicated context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test checks that it does call you on the same duplicated context. Not the interceptor, but the actual items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems we didn't understand each other :)
OTOH, it's unlikely interceptor invocation would hijack context somehow, so maybe we're good without a test I wrote a bout