Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for duplicated context in the gRPC clients #23731

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.server.services.HelloService;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.WorkerContext;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

public class MutinyClientInjectionTest {

Expand All @@ -35,6 +41,12 @@ public class MutinyClientInjectionTest {
public void test() {
String neo = service.invoke("neo-mutiny");
assertThat(neo).matches("Hello neo-mutiny");

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to also test if client interceptors are invoked with the same duplicated context

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test checks that it does call you on the same duplicated context. Not the interceptor, but the actual items.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we didn't understand each other :)
OTOH, it's unlikely interceptor invocation would hijack context somehow, so maybe we're good without a test I wrote a bout

String fromIO = service.invokeFromIoThread("neo-io");
assertThat(fromIO).matches("Hello neo-io");

String fromDC = service.invokeFromDuplicatedContext("neo-dc");
assertThat(fromDC).matches("Hello neo-dc");
}

@ApplicationScoped
Expand All @@ -43,11 +55,43 @@ static class MyConsumer {
@GrpcClient("hello-service")
Greeter service;

@Inject
Vertx vertx;

public String invoke(String s) {
return service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext()).isNull())
.await().atMost(Duration.ofSeconds(5));
}

public String invokeFromIoThread(String s) {
Context context = vertx.getOrCreateContext();
return Uni.createFrom().<String> emitter(e -> {
context.runOnContext(() -> {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext()).isNotNull().isEqualTo(context))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(EventLoopContext.class))
.subscribe().with(e::complete, e::fail);
});
}).await().atMost(Duration.ofSeconds(5));
}

public String invokeFromDuplicatedContext(String s) {
Context root = vertx.getOrCreateContext();
ContextInternal duplicate = ((ContextInternal) root.getDelegate()).duplicate();
return Uni.createFrom().<String> emitter(e -> {
duplicate.runOnContext(x -> {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext().getDelegate())
.isNotInstanceOf(EventLoopContext.class).isNotInstanceOf(WorkerContext.class)
.isEqualTo(duplicate))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's where we verify we are called on the same duplicated context

.subscribe().with(e::complete, e::fail);
});
}).await().atMost(Duration.ofSeconds(5));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import io.quarkus.grpc.server.services.HelloService;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.WorkerContext;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

public class MutinyStubInjectionTest {
Expand All @@ -46,6 +50,9 @@ public void test() {

neo = service.invokeFromIoThread("neo-io");
assertThat(neo).startsWith("Hello neo-io").contains("vert.x");

neo = service.invokeFromDuplicatedContext("neo-dc");
assertThat(neo).startsWith("Hello neo-dc").contains("vert.x");
}

@ApplicationScoped
Expand All @@ -60,15 +67,35 @@ static class MyConsumer {
public String invoke(String s) {
return service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext()).isNull())
.map(r -> r + " " + Thread.currentThread().getName())
.await().atMost(Duration.ofSeconds(5));
}

public String invokeFromIoThread(String s) {
Context context = vertx.getOrCreateContext();
return Uni.createFrom().<String> emitter(e -> {
context.runOnContext(() -> {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext()).isNotNull().isEqualTo(context))
.invoke(() -> assertThat(Vertx.currentContext().getDelegate()).isInstanceOf(EventLoopContext.class))
.map(r -> r + " " + Thread.currentThread().getName())
.subscribe().with(e::complete, e::fail);
});
}).await().atMost(Duration.ofSeconds(5));
}

public String invokeFromDuplicatedContext(String s) {
Context root = vertx.getOrCreateContext();
ContextInternal duplicate = ((ContextInternal) root.getDelegate()).duplicate();
return Uni.createFrom().<String> emitter(e -> {
vertx.runOnContext(() -> {
duplicate.runOnContext(x -> {
service.sayHello(HelloRequest.newBuilder().setName(s).build())
.map(HelloReply::getMessage)
.invoke(() -> assertThat(Vertx.currentContext().getDelegate())
.isNotInstanceOf(EventLoopContext.class).isNotInstanceOf(WorkerContext.class)
.isEqualTo(duplicate))
.map(r -> r + " " + Thread.currentThread().getName())
.subscribe().with(e::complete, e::fail);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@
import io.quarkus.grpc.runtime.config.SslClientConfig;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.util.ClassPathUtils;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.stork.Stork;

@SuppressWarnings({ "OptionalIsPresent", "Convert2Lambda" })
@SuppressWarnings({ "OptionalIsPresent" })
public class Channels {

private static final Logger LOGGER = Logger.getLogger(Channels.class.getName());
Expand Down Expand Up @@ -127,6 +128,10 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto

NettyChannelBuilder builder = NettyChannelBuilder
.forTarget(target)
// clients are intercepted using the IOThreadClientInterceptor interceptor which will decide on which
// thread the messages should be processed.
.directExecutor() // will use I/O thread - must not be blocked.
.offloadExecutor(Infrastructure.getDefaultExecutor())
.defaultLoadBalancingPolicy(loadBalancingPolicy)
.flowControlWindow(config.flowControlWindow.orElse(DEFAULT_FLOW_CONTROL_WINDOW))
.keepAliveWithoutCalls(config.keepAliveWithoutCalls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,37 @@ public class IOThreadClientInterceptor implements ClientInterceptor, Prioritized
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {

boolean isOnEventLoop = Context.isOnEventLoopThread();
Context context = Vertx.currentContext();

return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {

Context context = Vertx.currentContext();
boolean isOnIOThread = context != null && Context.isOnEventLoopThread();

super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {

@Override
public void onReady() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under impression that we want to invoke client stuff always with duplicated context but we stick to only propagating the context, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
The problem is that if we start using a duplicated context, we cannot block anymore, which would break the blocking clients. So I just propagate if there is a context.

if (isOnIOThread) {
context.runOnContext(unused -> super.onReady());
} else {
super.onReady();
}
}

@Override
public void onHeaders(Metadata headers) {
if (isOnIOThread) {
context.runOnContext(unused -> super.onHeaders(headers));
} else {
super.onHeaders(headers);
}
}

@Override
public void onMessage(RespT message) {
if (isOnEventLoop && context != null) {
if (isOnIOThread) {
context.runOnContext(unused -> super.onMessage(message));
} else {
super.onMessage(message);
Expand All @@ -47,7 +66,7 @@ public void onMessage(RespT message) {

@Override
public void onClose(Status status, Metadata trailers) {
if (isOnEventLoop && context != null) {
if (isOnIOThread) {
context.runOnContext(unused -> super.onClose(status, trailers));
} else {
super.onClose(status, trailers);
Expand Down