From aacd94de4202212d1cf623da2bca24a52ffcb45c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Fri, 16 Apr 2021 13:01:41 +0200 Subject: [PATCH] Rest Client Reactive - passing headers for async calls fixes #16577 --- .../deployment/test/sse/SseParserTest.java | 2 +- .../rest/client/reactive/AsyncHeaderTest.java | 75 +++++++++++++++++++ .../ClientReaderInterceptorContextImpl.java | 2 +- .../client/impl/InvocationBuilderImpl.java | 4 +- .../reactive/client/impl/MultiInvoker.java | 12 +-- .../impl/SseEventSourceBuilderImpl.java | 2 +- .../client/impl/SseEventSourceImpl.java | 8 +- .../reactive/client/impl/UniInvoker.java | 8 +- 8 files changed, 96 insertions(+), 17 deletions(-) create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/AsyncHeaderTest.java diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseParserTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseParserTest.java index 6dd57db0cbbaf..9836e508e0ea7 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseParserTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseParserTest.java @@ -158,7 +158,7 @@ private void testParser(List events, List expectedEvent } private void testParserWithBytes(List events, List expectedEvents) { - SseEventSourceImpl eventSource = new SseEventSourceImpl(null, 500, TimeUnit.MILLISECONDS); + SseEventSourceImpl eventSource = new SseEventSourceImpl(null, null, 500, TimeUnit.MILLISECONDS); SseParser parser = eventSource.getSseParser(); CountDownLatch latch = new CountDownLatch(expectedEvents.size()); List receivedEvents = Collections.synchronizedList(new ArrayList<>(expectedEvents.size())); diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/AsyncHeaderTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/AsyncHeaderTest.java new file mode 100644 index 0000000000000..b190184464404 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/AsyncHeaderTest.java @@ -0,0 +1,75 @@ +package io.quarkus.rest.client.reactive; + +import static io.quarkus.rest.client.reactive.RestClientTestUtil.setUrlForClass; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; + +public class AsyncHeaderTest { + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(Client.class, Resource.class) + .addAsResource( + new StringAsset(setUrlForClass(Client.class)), + "application.properties")); + + @RestClient + Client client; + + @Test + void shouldSendHeaderWithUni() { + String headerValue = "jaka piekna i dluga wartosc headera"; + String result = client.uniGet(headerValue) + .await().atMost(Duration.ofSeconds(10)); + assertThat(result).isEqualTo(String.format("passedHeader:%s", headerValue)); + } + + @Test + void shouldSendHeaderWithCompletionStage() throws ExecutionException, InterruptedException, TimeoutException { + String headerValue = "jaka piekna i dluga wartosc headera"; + String result = client.completionStageGet(headerValue) + .toCompletableFuture().get(10, TimeUnit.SECONDS); + assertThat(result).isEqualTo(String.format("passedHeader:%s", headerValue)); + } + + @Path("/") + @RegisterRestClient + @Produces(MediaType.TEXT_PLAIN) + interface Client { + @GET + Uni uniGet(@HeaderParam("some-header") String headerValue); + + @GET + CompletionStage completionStageGet(@HeaderParam("some-header") String headerValue); + } + + @Path("/") + static class Resource { + @GET + public String get(@HeaderParam("some-header") String headerValue) { + return String.format("passedHeader:%s", headerValue); + } + } +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientReaderInterceptorContextImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientReaderInterceptorContextImpl.java index 86641275cc520..55815186fa959 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientReaderInterceptorContextImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientReaderInterceptorContextImpl.java @@ -60,7 +60,7 @@ public Object proceed() throws IOException, WebApplicationException { } // Spec says to throw this throw new ProcessingException( - "Request could not be mapped to type " + entityType); + "Response could not be mapped to type " + entityType); } else { return interceptors[index++].aroundReadFrom(this); } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/InvocationBuilderImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/InvocationBuilderImpl.java index 489b85dd0d76d..0d7a76cab7e95 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/InvocationBuilderImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/InvocationBuilderImpl.java @@ -175,9 +175,9 @@ public CompletionStageRxInvoker rx() { @Override public T rx(Class clazz) { if (clazz == MultiInvoker.class) { - return (T) new MultiInvoker(target); + return (T) new MultiInvoker(this); } else if (clazz == UniInvoker.class) { - return (T) new UniInvoker(target); + return (T) new UniInvoker(this); } RxInvokerProvider invokerProvider = requestSpec.configuration.getRxInvokerProvider(clazz); if (invokerProvider != null) { diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index b6cd2aabd79b5..a94327152dae6 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -13,14 +13,13 @@ import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.jboss.resteasy.reactive.client.impl.MultiInvoker.MultiRequest; public class MultiInvoker extends AbstractRxInvoker> { - private final WebTargetImpl target; + private final InvocationBuilderImpl invocationBuilder; - public MultiInvoker(WebTargetImpl target) { - this.target = target; + public MultiInvoker(InvocationBuilderImpl target) { + this.invocationBuilder = target; } @SuppressWarnings("unchecked") @@ -85,7 +84,7 @@ public void onCancel(Runnable onCancel) { @Override public Multi method(String name, Entity entity, GenericType responseType) { - AsyncInvokerImpl invoker = (AsyncInvokerImpl) target.request().rx(); + AsyncInvokerImpl invoker = (AsyncInvokerImpl) invocationBuilder.rx(); // FIXME: backpressure setting? return Multi.createFrom().emitter(emitter -> { MultiRequest multiRequest = new MultiRequest<>(emitter); @@ -122,7 +121,8 @@ private void registerForSse(MultiRequest multiRequest, // honestly, isn't reconnect contradictory with completion? // FIXME: Reconnect settings? // For now we don't want multi to reconnect - SseEventSourceImpl sseSource = new SseEventSourceImpl(target, Integer.MAX_VALUE, TimeUnit.SECONDS); + SseEventSourceImpl sseSource = new SseEventSourceImpl(invocationBuilder.getTarget(), + invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS); // FIXME: deal with cancellation sseSource.register(event -> { // DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceBuilderImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceBuilderImpl.java index a55f743844a68..0033a1c89d69a 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceBuilderImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceBuilderImpl.java @@ -31,7 +31,7 @@ public Builder reconnectingEvery(long delay, TimeUnit unit) { @Override public SseEventSource build() { - return new SseEventSourceImpl((WebTargetImpl) endpoint, reconnectDelay, reconnectUnit); + return new SseEventSourceImpl((WebTargetImpl) endpoint, endpoint.request(), reconnectDelay, reconnectUnit); } } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java index 87f525ff3a5be..2d7557c0817aa 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java @@ -10,6 +10,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import javax.ws.rs.client.Invocation; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.sse.InboundSseEvent; @@ -22,6 +23,7 @@ public class SseEventSourceImpl implements SseEventSource, Handler { private long reconnectDelay; private final WebTargetImpl webTarget; + private final Invocation.Builder invocationBuilder; // this tracks user request to open/close private volatile boolean isOpen; // this tracks whether we have a connection open @@ -35,7 +37,8 @@ public class SseEventSourceImpl implements SseEventSource, Handler { private long timerId = -1; private boolean receivedClientClose; - public SseEventSourceImpl(WebTargetImpl webTarget, long reconnectDelay, TimeUnit reconnectUnit) { + public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocationBuilder, + long reconnectDelay, TimeUnit reconnectUnit) { // tests set a null endpoint Objects.requireNonNull(reconnectUnit); if (reconnectDelay <= 0) @@ -44,6 +47,7 @@ public SseEventSourceImpl(WebTargetImpl webTarget, long reconnectDelay, TimeUnit this.reconnectDelay = reconnectDelay; this.reconnectUnit = reconnectUnit; this.sseParser = new SseParser(this); + this.invocationBuilder = invocationBuilder; } WebTargetImpl getWebTarget() { @@ -83,7 +87,7 @@ private void connect() { isInProgress = true; // ignore previous client closes receivedClientClose = false; - AsyncInvokerImpl invoker = (AsyncInvokerImpl) webTarget.request().rx(); + AsyncInvokerImpl invoker = (AsyncInvokerImpl) invocationBuilder.rx(); RestClientRequestContext restClientRequestContext = invoker.performRequestInternal("GET", null, null, false); restClientRequestContext.getResult().handle((response, throwable) -> { // errors during connection don't currently lead to a retry diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/UniInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/UniInvoker.java index ab96cb7ab9546..aebc78c36ccc1 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/UniInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/UniInvoker.java @@ -7,15 +7,15 @@ public class UniInvoker extends AbstractRxInvoker> { - private WebTargetImpl target; + private InvocationBuilderImpl invocationBuilder; - public UniInvoker(WebTargetImpl target) { - this.target = target; + public UniInvoker(InvocationBuilderImpl invocationBuilder) { + this.invocationBuilder = invocationBuilder; } @Override public Uni method(String name, Entity entity, GenericType responseType) { - AsyncInvokerImpl invoker = (AsyncInvokerImpl) target.request().rx(); + AsyncInvokerImpl invoker = (AsyncInvokerImpl) invocationBuilder.rx(); return Uni.createFrom().completionStage(invoker.method(name, entity, responseType)); }