Skip to content

Commit

Permalink
Rest Client Reactive - passing headers for async calls
Browse files Browse the repository at this point in the history
  • Loading branch information
michalszynkiewicz committed Apr 16, 2021
1 parent 29d257f commit aacd94d
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private void testParser(List<String> events, List<InboundSseEvent> expectedEvent
}

private void testParserWithBytes(List<byte[]> events, List<InboundSseEvent> expectedEvents) {
SseEventSourceImpl eventSource = new SseEventSourceImpl(null, 500, TimeUnit.MILLISECONDS);
SseEventSourceImpl eventSource = new SseEventSourceImpl(null, null, 500, TimeUnit.MILLISECONDS);
SseParser parser = eventSource.getSseParser();
CountDownLatch latch = new CountDownLatch(expectedEvents.size());
List<InboundSseEvent> receivedEvents = Collections.synchronizedList(new ArrayList<>(expectedEvents.size()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.quarkus.rest.client.reactive;

import static io.quarkus.rest.client.reactive.RestClientTestUtil.setUrlForClass;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class AsyncHeaderTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Client.class, Resource.class)
.addAsResource(
new StringAsset(setUrlForClass(Client.class)),
"application.properties"));

@RestClient
Client client;

@Test
void shouldSendHeaderWithUni() {
String headerValue = "jaka piekna i dluga wartosc headera";
String result = client.uniGet(headerValue)
.await().atMost(Duration.ofSeconds(10));
assertThat(result).isEqualTo(String.format("passedHeader:%s", headerValue));
}

@Test
void shouldSendHeaderWithCompletionStage() throws ExecutionException, InterruptedException, TimeoutException {
String headerValue = "jaka piekna i dluga wartosc headera";
String result = client.completionStageGet(headerValue)
.toCompletableFuture().get(10, TimeUnit.SECONDS);
assertThat(result).isEqualTo(String.format("passedHeader:%s", headerValue));
}

@Path("/")
@RegisterRestClient
@Produces(MediaType.TEXT_PLAIN)
interface Client {
@GET
Uni<String> uniGet(@HeaderParam("some-header") String headerValue);

@GET
CompletionStage<String> completionStageGet(@HeaderParam("some-header") String headerValue);
}

@Path("/")
static class Resource {
@GET
public String get(@HeaderParam("some-header") String headerValue) {
return String.format("passedHeader:%s", headerValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Object proceed() throws IOException, WebApplicationException {
}
// Spec says to throw this
throw new ProcessingException(
"Request could not be mapped to type " + entityType);
"Response could not be mapped to type " + entityType);
} else {
return interceptors[index++].aroundReadFrom(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ public CompletionStageRxInvoker rx() {
@Override
public <T extends RxInvoker> T rx(Class<T> clazz) {
if (clazz == MultiInvoker.class) {
return (T) new MultiInvoker(target);
return (T) new MultiInvoker(this);
} else if (clazz == UniInvoker.class) {
return (T) new UniInvoker(target);
return (T) new UniInvoker(this);
}
RxInvokerProvider<?> invokerProvider = requestSpec.configuration.getRxInvokerProvider(clazz);
if (invokerProvider != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.jboss.resteasy.reactive.client.impl.MultiInvoker.MultiRequest;

public class MultiInvoker extends AbstractRxInvoker<Multi<?>> {

private final WebTargetImpl target;
private final InvocationBuilderImpl invocationBuilder;

public MultiInvoker(WebTargetImpl target) {
this.target = target;
public MultiInvoker(InvocationBuilderImpl target) {
this.invocationBuilder = target;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -85,7 +84,7 @@ public void onCancel(Runnable onCancel) {

@Override
public <R> Multi<R> method(String name, Entity<?> entity, GenericType<R> responseType) {
AsyncInvokerImpl invoker = (AsyncInvokerImpl) target.request().rx();
AsyncInvokerImpl invoker = (AsyncInvokerImpl) invocationBuilder.rx();
// FIXME: backpressure setting?
return Multi.createFrom().emitter(emitter -> {
MultiRequest<R> multiRequest = new MultiRequest<>(emitter);
Expand Down Expand Up @@ -122,7 +121,8 @@ private <R> void registerForSse(MultiRequest<? super R> multiRequest,
// honestly, isn't reconnect contradictory with completion?
// FIXME: Reconnect settings?
// For now we don't want multi to reconnect
SseEventSourceImpl sseSource = new SseEventSourceImpl(target, Integer.MAX_VALUE, TimeUnit.SECONDS);
SseEventSourceImpl sseSource = new SseEventSourceImpl(invocationBuilder.getTarget(),
invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS);
// FIXME: deal with cancellation
sseSource.register(event -> {
// DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Builder reconnectingEvery(long delay, TimeUnit unit) {

@Override
public SseEventSource build() {
return new SseEventSourceImpl((WebTargetImpl) endpoint, reconnectDelay, reconnectUnit);
return new SseEventSourceImpl((WebTargetImpl) endpoint, endpoint.request(), reconnectDelay, reconnectUnit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.sse.InboundSseEvent;
Expand All @@ -22,6 +23,7 @@ public class SseEventSourceImpl implements SseEventSource, Handler<Long> {
private long reconnectDelay;

private final WebTargetImpl webTarget;
private final Invocation.Builder invocationBuilder;
// this tracks user request to open/close
private volatile boolean isOpen;
// this tracks whether we have a connection open
Expand All @@ -35,7 +37,8 @@ public class SseEventSourceImpl implements SseEventSource, Handler<Long> {
private long timerId = -1;
private boolean receivedClientClose;

public SseEventSourceImpl(WebTargetImpl webTarget, long reconnectDelay, TimeUnit reconnectUnit) {
public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocationBuilder,
long reconnectDelay, TimeUnit reconnectUnit) {
// tests set a null endpoint
Objects.requireNonNull(reconnectUnit);
if (reconnectDelay <= 0)
Expand All @@ -44,6 +47,7 @@ public SseEventSourceImpl(WebTargetImpl webTarget, long reconnectDelay, TimeUnit
this.reconnectDelay = reconnectDelay;
this.reconnectUnit = reconnectUnit;
this.sseParser = new SseParser(this);
this.invocationBuilder = invocationBuilder;
}

WebTargetImpl getWebTarget() {
Expand Down Expand Up @@ -83,7 +87,7 @@ private void connect() {
isInProgress = true;
// ignore previous client closes
receivedClientClose = false;
AsyncInvokerImpl invoker = (AsyncInvokerImpl) webTarget.request().rx();
AsyncInvokerImpl invoker = (AsyncInvokerImpl) invocationBuilder.rx();
RestClientRequestContext restClientRequestContext = invoker.performRequestInternal("GET", null, null, false);
restClientRequestContext.getResult().handle((response, throwable) -> {
// errors during connection don't currently lead to a retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

public class UniInvoker extends AbstractRxInvoker<Uni<?>> {

private WebTargetImpl target;
private InvocationBuilderImpl invocationBuilder;

public UniInvoker(WebTargetImpl target) {
this.target = target;
public UniInvoker(InvocationBuilderImpl invocationBuilder) {
this.invocationBuilder = invocationBuilder;
}

@Override
public <R> Uni<R> method(String name, Entity<?> entity, GenericType<R> responseType) {
AsyncInvokerImpl invoker = (AsyncInvokerImpl) target.request().rx();
AsyncInvokerImpl invoker = (AsyncInvokerImpl) invocationBuilder.rx();
return Uni.createFrom().completionStage(invoker.method(name, entity, responseType));
}

Expand Down

0 comments on commit aacd94d

Please sign in to comment.