Skip to content

Commit

Permalink
Introduce a JSON Stream parser for the reactive rest client
Browse files Browse the repository at this point in the history
The previous behavior generated a corrupted payload when the frame was cut in the middle of the JSON object. This new simple parser accumulates correctly.

Fix quarkusio#30044
  • Loading branch information
cescoffier committed Jan 9, 2023
1 parent d8650b8 commit e7eeaea
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;
package io.quarkus.resteasy.reactive.jackson.deployment.test.streams;

public class Message {
public String name;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;
package io.quarkus.resteasy.reactive.jackson.deployment.test.streams;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
Expand All @@ -16,8 +19,8 @@
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;

@Path("sse")
public class SseResource {
@Path("streams")
public class StreamResource {

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
Expand Down Expand Up @@ -120,4 +123,21 @@ public Multi<Message> multiStreamJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

/**
* Reproduce <a href="https://github.com/quarkusio/quarkus/issues/30044">#30044</a>.
*/
@Path("stream-json/multi/fast")
@GET
@Produces(RestMediaType.APPLICATION_STREAM_JSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiStreamJsonFast() {
List<UUID> ids = new ArrayList<>(5000);
for (int i = 0; i < 5000; i++) {
ids.add(UUID.randomUUID());
}
return Multi.createFrom().items(ids::stream)
.onItem().transform(id -> new Message(id.toString()))
.onOverflow().buffer(81920);
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;
package io.quarkus.resteasy.reactive.jackson.deployment.test.streams;

import static io.restassured.RestAssured.when;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -33,24 +33,24 @@
import io.quarkus.test.common.http.TestHTTPResource;
import io.smallrye.mutiny.Multi;

public class SseTestCase {
public class StreamTestCase {

@TestHTTPResource
URI uri;

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(SseResource.class, Message.class));
.addClasses(StreamResource.class, Message.class));

@Test
public void testSseFromSse() throws Exception {
testSse("sse");
testSse("streams");
}

@Test
public void testSseFromMulti() throws Exception {
testSse("sse/multi");
testSse("streams/multi");
}

private void testSse(String path) throws Exception {
Expand Down Expand Up @@ -81,12 +81,12 @@ public void accept(Throwable throwable) {

@Test
public void testMultiFromSse() {
testMulti("sse");
testMulti("streams");
}

@Test
public void testMultiFromMulti() {
testMulti("sse/multi");
testMulti("streams/multi");
}

private void testMulti(String path) {
Expand All @@ -99,24 +99,24 @@ private void testMulti(String path) {

@Test
public void testJsonMultiFromSse() {
testJsonMulti("sse/json");
testJsonMulti("sse/json2");
testJsonMulti("sse/blocking/json");
testJsonMulti("streams/json");
testJsonMulti("streams/json2");
testJsonMulti("streams/blocking/json");
}

@Test
public void testJsonMultiFromMulti() {
testJsonMulti("sse/json/multi");
testJsonMulti("streams/json/multi");
}

@Test
public void testJsonMultiFromMultiWithDefaultElementType() {
testJsonMulti("sse/json/multi2");
testJsonMulti("streams/json/multi2");
}

@Test
public void testNdJsonMultiFromMulti() {
when().get(uri.toString() + "sse/ndjson/multi")
when().get(uri.toString() + "streams/ndjson/multi")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}\n"
Expand All @@ -127,7 +127,7 @@ public void testNdJsonMultiFromMulti() {

@Test
public void testStreamJsonMultiFromMulti() {
when().get(uri.toString() + "sse/stream-json/multi")
when().get(uri.toString() + "streams/stream-json/multi")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}\n"
Expand All @@ -143,4 +143,19 @@ private void testJsonMulti(String path) {
List<Message> list = multi.collect().asList().await().atMost(Duration.ofSeconds(30));
assertThat(list).extracting("name").containsExactly("hello", "stef");
}

/**
* Reproduce <a href="https://github.com/quarkusio/quarkus/issues/30044">#30044</a>.
*/
@Test
public void testStreamJsonMultiFromMultiFast() {
String payload = when().get(uri.toString() + "streams/stream-json/multi/fast")
.then().statusCode(HttpStatus.SC_OK)
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_STREAM_JSON))
.extract().response().asString();

// the payload include 5000 json objects
assertThat(payload.lines()).hasSize(5000)
.allSatisfy(s -> assertThat(s).matches("\\{\"name\":\".*\"}"));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.jboss.resteasy.reactive.client.impl;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -18,6 +19,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.parsetools.RecordParser;

public class MultiInvoker extends AbstractRxInvoker<Multi<?>> {

Expand Down Expand Up @@ -121,10 +123,13 @@ public <R> Multi<R> method(String name, Entity<?> entity, GenericType<R> respons
} else {
HttpClientResponse vertxResponse = restClientRequestContext.getVertxClientResponse();
if (!emitter.isCancelled()) {
// FIXME: this is probably not good enough
if (response.getStatus() == 200
&& MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) {
registerForSse(multiRequest, responseType, response, vertxResponse);
} else if (response.getStatus() == 200
&& RestMediaType.APPLICATION_STREAM_JSON_TYPE.isCompatible(response.getMediaType())) {
registerForJsonStream(multiRequest, restClientRequestContext, responseType, response,
vertxResponse);
} else {
// read stuff in chunks
registerForChunks(multiRequest, restClientRequestContext, responseType, response, vertxResponse);
Expand Down Expand Up @@ -267,4 +272,46 @@ private boolean startsWith(byte[] array, byte[] prefix) {
});
}

private <R> void registerForJsonStream(MultiRequest<? super R> multiRequest,
RestClientRequestContext restClientRequestContext,
GenericType<R> responseType,
ResponseImpl response,
HttpClientResponse vertxClientResponse) {
RecordParser parser = RecordParser.newDelimited("\n");
parser.handler(new Handler<Buffer>() {
@Override
public void handle(Buffer chunk) {

ByteArrayInputStream in = new ByteArrayInputStream(chunk.getBytes());
try {
R item = restClientRequestContext.readEntity(in, responseType, response.getMediaType(),
response.getMetadata());
multiRequest.emitter.emit(item);
} catch (IOException e) {
multiRequest.emitter.fail(e);
}
}
});
vertxClientResponse.exceptionHandler(t -> {
if (t == ConnectionBase.CLOSED_EXCEPTION) {
// we can ignore this one since we registered a closeHandler
} else {
multiRequest.emitter.fail(t);
}
});
vertxClientResponse.endHandler(new Handler<Void>() {
@Override
public void handle(Void c) {
multiRequest.complete();
}
});

vertxClientResponse.handler(parser);

// watch for user cancelling
multiRequest.onCancel(() -> {
vertxClientResponse.request().connection().close();
});
}

}

0 comments on commit e7eeaea

Please sign in to comment.