Skip to content

Commit

Permalink
Merge pull request #22996 from stuartwdouglas/22973
Browse files Browse the repository at this point in the history
Fix response truncation
  • Loading branch information
famod authored Jan 19, 2022
2 parents 2c47a0f + 6a7a0f7 commit 5d653f8
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.quarkus.resteasy.jackson;

import java.util.HashMap;
import java.util.Map;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Produces(MediaType.APPLICATION_JSON)
@Path("/large")
public class LargeResource {

@GET
@Path("/bufmult")
public Map<String, String> hello() {
Map<String, String> ret = new HashMap<>();
for (int i = 0; i < 830; ++i) {
if (i == 0) {
//hack to make this exactly 2 * 8191 bytes long, as tested by trial and error
ret.put("key00", "value" + i);
} else {
ret.put("key" + i, "value" + i);
}
}
return ret;
}

@GET
@Path("/huge")
public Map<String, String> huge() {
Map<String, String> ret = new HashMap<>();
for (int i = 0; i < 1280; ++i) {
ret.put("key" + i, "value" + i);
}
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.resteasy.jackson;

import static org.hamcrest.Matchers.equalTo;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class LargeResponseTest {

@RegisterExtension
static QuarkusUnitTest TEST = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(LargeResource.class));

@Test
public void testLargeResponseMultipleOfBuffer() {
RestAssured.get("/large/bufmult").then()
.statusCode(200)
.body("key500", equalTo("value500"));
}

@Test
public void testLargeResponse() {
RestAssured.get("/large/huge").then()
.statusCode(200)
.body("key500", equalTo("value500"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import javax.ws.rs.core.HttpHeaders;

import org.jboss.resteasy.spi.AsyncOutputStream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class VertxOutputStream extends AsyncOutputStream {

Expand Down Expand Up @@ -160,36 +162,38 @@ public CompletionStage<Void> asyncWrite(final byte[] b, final int off, final int
return ret;
}

CompletionStage<Void> ret = CompletableFuture.completedFuture(null);
int bufferSize = allocator.getBufferSize();
int bufferCount = len / bufferSize;
int remainder = len % bufferSize;
if (remainder != 0) {
bufferCount = bufferCount + 1;
}
CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);

if (bufferCount == 1) {
if (pooledBuffer == null) {
pooledBuffer = allocator.allocateBuffer();
}
pooledBuffer.writeBytes(b, 0, len);
} else {
for (int i = 0; i < bufferCount - 1; i++) {
int bufferIndex = i;
ret = ret.thenCompose(v -> {
ByteBuf tmpBuf = null;
if ((bufferIndex == 0) && ((pooledBuffer != null))) {
tmpBuf = pooledBuffer;
ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(b, off, len);
if (pooledBuffer == null) {
pooledBuffer = allocator.allocateBuffer();
}
pooledBuffer.writeBytes(wrappedBuffer, Math.min(pooledBuffer.writableBytes(), wrappedBuffer.readableBytes()));
if (pooledBuffer.writableBytes() == 0) {
CompletableFuture<Void> cf = new CompletableFuture<>();
ret = cf;
ByteBuf filled = pooledBuffer;
pooledBuffer = null;
response.writeNonBlocking(filled, false).whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void unused, Throwable throwable) {
if (throwable != null) {
cf.completeExceptionally(throwable);
return;
}
if (tmpBuf == null) {
tmpBuf = allocator.allocateBuffer();
pooledBuffer = allocator.allocateBuffer();
pooledBuffer.writeBytes(wrappedBuffer,
Math.min(pooledBuffer.writableBytes(), wrappedBuffer.readableBytes()));

if (pooledBuffer.writableBytes() == 0) {
ByteBuf filled = pooledBuffer;
pooledBuffer = null;
response.writeNonBlocking(filled, false).whenComplete(this);
} else {
cf.complete(null);
}
tmpBuf.writeBytes(b, bufferIndex * bufferSize, bufferSize);
return response.writeNonBlocking(tmpBuf, false);
});
}
pooledBuffer = allocator.allocateBuffer();
pooledBuffer.writeBytes(b, (bufferCount - 1) * bufferSize, remainder);
}
});
}

return ret.thenCompose(v -> asyncUpdateWritten(len));
Expand Down

0 comments on commit 5d653f8

Please sign in to comment.