() {
- @Override
- public void onStart() {
- request(1);
- }
-
- @Override
- public void onCompleted() {
- downstream.onCompleted();
- }
-
- @Override
- public void onError(Throwable e) {
- downstream.onError(e);
- }
-
- @Override
- public void onNext(E e) {
- if (downstream.isUnsubscribed()) {
- ReferenceCountUtil.release(e);
- } else {
- downstream.onNext(e);
- }
-
- request(1);
- }
- };
- }
-}
diff --git a/components/api/src/main/java/com/hotels/styx/api/FullHttpRequest.java b/components/api/src/main/java/com/hotels/styx/api/FullHttpRequest.java
index e99004d429..d17367f789 100644
--- a/components/api/src/main/java/com/hotels/styx/api/FullHttpRequest.java
+++ b/components/api/src/main/java/com/hotels/styx/api/FullHttpRequest.java
@@ -16,8 +16,7 @@
package com.hotels.styx.api;
import com.google.common.collect.ImmutableSet;
-import io.netty.buffer.Unpooled;
-import rx.Observable;
+import reactor.core.publisher.Flux;
import java.nio.charset.Charset;
import java.util.Collection;
@@ -45,6 +44,7 @@
import static com.hotels.styx.api.HttpVersion.HTTP_1_1;
import static com.hotels.styx.api.RequestCookie.decode;
import static com.hotels.styx.api.RequestCookie.encode;
+import static io.netty.buffer.Unpooled.copiedBuffer;
import static java.lang.Integer.parseInt;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
@@ -315,9 +315,9 @@ public HttpRequest toStreamingRequest() {
.disableValidation();
if (this.body.length == 0) {
- return streamingBuilder.body(new StyxCoreObservable<>(Observable.empty())).build();
+ return streamingBuilder.body(new ByteStream(Flux.empty())).build();
} else {
- return streamingBuilder.body(StyxObservable.of(Unpooled.copiedBuffer(body))).build();
+ return streamingBuilder.body(new ByteStream(Flux.just(new Buffer(copiedBuffer(body))))).build();
}
}
diff --git a/components/api/src/main/java/com/hotels/styx/api/FullHttpResponse.java b/components/api/src/main/java/com/hotels/styx/api/FullHttpResponse.java
index 8b68842324..14d5902a74 100644
--- a/components/api/src/main/java/com/hotels/styx/api/FullHttpResponse.java
+++ b/components/api/src/main/java/com/hotels/styx/api/FullHttpResponse.java
@@ -17,7 +17,7 @@
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
-import rx.Observable;
+import reactor.core.publisher.Flux;
import java.nio.charset.Charset;
import java.util.Collection;
@@ -32,10 +32,10 @@
import static com.hotels.styx.api.HttpHeaderNames.SET_COOKIE;
import static com.hotels.styx.api.HttpHeaderNames.TRANSFER_ENCODING;
import static com.hotels.styx.api.HttpHeaderValues.CHUNKED;
-import static com.hotels.styx.api.ResponseCookie.decode;
-import static com.hotels.styx.api.ResponseCookie.encode;
import static com.hotels.styx.api.HttpResponseStatus.OK;
import static com.hotels.styx.api.HttpVersion.HTTP_1_1;
+import static com.hotels.styx.api.ResponseCookie.decode;
+import static com.hotels.styx.api.ResponseCookie.encode;
import static io.netty.buffer.Unpooled.copiedBuffer;
import static java.lang.Integer.parseInt;
import static java.util.Arrays.asList;
@@ -70,7 +70,7 @@ public class FullHttpResponse implements FullHttpMessage {
private final HttpHeaders headers;
private final byte[] body;
- FullHttpResponse(Builder builder) {
+ private FullHttpResponse(Builder builder) {
this.version = builder.version;
this.status = builder.status;
this.headers = builder.headers.build();
@@ -182,9 +182,9 @@ public boolean isRedirect() {
*/
public HttpResponse toStreamingResponse() {
if (this.body.length == 0) {
- return new HttpResponse.Builder(this, new StyxCoreObservable<>(Observable.empty())).build();
+ return new HttpResponse.Builder(this, new ByteStream(Flux.empty())).build();
} else {
- return new HttpResponse.Builder(this, StyxObservable.of(copiedBuffer(this.body))).build();
+ return new HttpResponse.Builder(this, new ByteStream(Flux.just(new Buffer(copiedBuffer(this.body))))).build();
}
}
diff --git a/components/api/src/main/java/com/hotels/styx/api/HttpRequest.java b/components/api/src/main/java/com/hotels/styx/api/HttpRequest.java
index 00d3a047c5..31100b8c37 100644
--- a/components/api/src/main/java/com/hotels/styx/api/HttpRequest.java
+++ b/components/api/src/main/java/com/hotels/styx/api/HttpRequest.java
@@ -16,9 +16,7 @@
package com.hotels.styx.api;
import com.google.common.collect.ImmutableSet;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
-import rx.Observable;
+import reactor.core.publisher.Flux;
import java.util.Collection;
import java.util.Collections;
@@ -26,11 +24,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
import java.util.function.Predicate;
import static com.google.common.base.Objects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.hotels.styx.api.FlowControlDisableOperator.disableFlowControl;
import static com.hotels.styx.api.HttpHeaderNames.CONNECTION;
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
import static com.hotels.styx.api.HttpHeaderNames.COOKIE;
@@ -49,11 +47,8 @@
import static com.hotels.styx.api.RequestCookie.decode;
import static com.hotels.styx.api.RequestCookie.encode;
import static io.netty.buffer.ByteBufUtil.getBytes;
-import static io.netty.buffer.Unpooled.compositeBuffer;
import static io.netty.buffer.Unpooled.copiedBuffer;
-import static io.netty.util.ReferenceCountUtil.release;
import static java.lang.Integer.parseInt;
-import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
@@ -70,7 +65,7 @@
*
* An {@code HttpRequest} object is immutable with respect to the request line
* attributes and HTTP headers. Once an instance is created, they cannot change.
- *
+ *
* An {@code HttpRequest} body is a byte buffer stream that can be consumed
* as sequence of asynchronous events. Once consumed, the stream is exhausted and
* can not be reused. Conceptually each {@code HttpRequest} object
@@ -78,21 +73,21 @@
* For example, a Styx Server implements a content producer for {@link HttpInterceptor}
* extensions. The producer receives data chunks from a network socket and publishes
* them to an appropriate content stream.
- *
+ *
* HTTP requests are created via {@code Builder} object, which can be created
* with static helper methods:
*
*
- * - {@code get}
- * - {@code head}
- * - {@code post}
- * - {@code put}
- * - {@code delete}
- * - {@code patch}
+ * - {@code get}
+ * - {@code head}
+ * - {@code post}
+ * - {@code put}
+ * - {@code delete}
+ * - {@code patch}
*
- *
+ *
* A builder can also be created with one of the {@code Builder} constructors.
- *
+ *
* A special method {@code newBuilder} creates a prepopulated {@code Builder}
* from the current request object. It is useful for transforming a request
* to another one my modifying one or more of its attributes.
@@ -103,7 +98,7 @@ public class HttpRequest implements StreamingHttpMessage {
private final HttpMethod method;
private final Url url;
private final HttpHeaders headers;
- private final StyxObservable body;
+ private final ByteStream body;
HttpRequest(Builder builder) {
this.id = builder.id == null ? randomUUID() : builder.id;
@@ -179,10 +174,9 @@ public static Builder patch(String uri) {
*
* @param uri URI
* @param body body
- * @param body type
* @return {@code this}
*/
- public static Builder post(String uri, StyxObservable body) {
+ public static Builder post(String uri, ByteStream body) {
return new Builder(POST, uri).body(body);
}
@@ -191,10 +185,9 @@ public static Builder post(String uri, StyxObservable body) {
*
* @param uri URI
* @param body body
- * @param body type
* @return {@code this}
*/
- public static Builder put(String uri, StyxObservable body) {
+ public static Builder put(String uri, ByteStream body) {
return new Builder(PUT, uri).body(body);
}
@@ -203,10 +196,9 @@ public static Builder put(String uri, StyxObservable body) {
*
* @param uri URI
* @param body body
- * @param body type
* @return {@code this}
*/
- public static Builder patch(String uri, StyxObservable body) {
+ public static Builder patch(String uri, ByteStream body) {
return new Builder(PATCH, uri).body(body);
}
@@ -239,7 +231,7 @@ public List headers(CharSequence name) {
* @return request body as a byte stream
*/
@Override
- public StyxObservable body() {
+ public ByteStream body() {
return body;
}
@@ -339,11 +331,11 @@ public Builder newBuilder() {
* Returns a {@link StyxObservable} that eventually produces a
* {@link FullHttpRequest}. The resulting full request object has the same
* request line, headers, and content as this request.
- *
+ *
* The content stream is aggregated asynchronously. The stream may be connected
* to a network socket or some other content producer. Once aggregated, a
* FullHttpRequest object is emitted on the returned {@link StyxObservable}.
- *
+ *
* A sole {@code maxContentBytes} argument is a backstop defence against excessively
* long content streams. The {@code maxContentBytes} should be set to a sensible
* value according to your application requirements and heap size. When the content
@@ -354,34 +346,17 @@ public Builder newBuilder() {
* @return a {@link StyxObservable}
*/
public StyxObservable toFullRequest(int maxContentBytes) {
- CompositeByteBuf byteBufs = compositeBuffer();
-
- return new StyxCoreObservable<>(
- ((StyxCoreObservable) body)
- .delegate()
- .lift(disableFlowControl())
- .doOnError(e -> byteBufs.release())
- .collect(() -> byteBufs, (composite, part) -> {
- long newSize = composite.readableBytes() + part.readableBytes();
-
- if (newSize > maxContentBytes) {
- release(composite);
- release(part);
-
- throw new ContentOverflowException(format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxContentBytes));
- }
- composite.addComponent(part);
- composite.writerIndex(composite.writerIndex() + part.readableBytes());
- })
- .map(HttpRequest::decodeAndRelease)
- .map(decoded -> new FullHttpRequest.Builder(this, decoded).build()));
+ return StyxObservable.from(
+ body.aggregate(maxContentBytes)
+ .thenApply(it -> new FullHttpRequest.Builder(this, decodeAndRelease(it)).build())
+ );
}
- private static byte[] decodeAndRelease(CompositeByteBuf aggregate) {
+ private static byte[] decodeAndRelease(Buffer aggregate) {
try {
- return getBytes(aggregate);
+ return getBytes(aggregate.delegate());
} finally {
- aggregate.release();
+ aggregate.delegate().release();
}
}
@@ -429,7 +404,7 @@ public static final class Builder {
private Url url;
private HttpHeaders.Builder headers;
private HttpVersion version = HTTP_1_1;
- private StyxObservable body;
+ private ByteStream body;
/**
* Creates a new {@link Builder} object with default attributes.
@@ -437,14 +412,14 @@ public static final class Builder {
public Builder() {
this.url = Url.Builder.url("/").build();
this.headers = new HttpHeaders.Builder();
- this.body = new StyxCoreObservable<>(Observable.empty());
+ this.body = new ByteStream(Flux.empty());
}
/**
* Creates a new {@link Builder} with specified HTTP method and URI.
*
* @param method a HTTP method
- * @param uri a HTTP URI
+ * @param uri a HTTP URI
*/
public Builder(HttpMethod method, String uri) {
this();
@@ -455,10 +430,10 @@ public Builder(HttpMethod method, String uri) {
/**
* Creates a new {@link Builder} from an existing request with a new body content stream.
*
- * @param request a HTTP request object
+ * @param request a HTTP request object
* @param contentStream a body content stream
*/
- public Builder(HttpRequest request, StyxObservable contentStream) {
+ public Builder(HttpRequest request, ByteStream contentStream) {
this.id = request.id();
this.method = httpMethod(request.method().name());
this.url = request.url();
@@ -482,7 +457,7 @@ public Builder(HttpRequest request, StyxObservable contentStream) {
this.url = request.url();
this.version = request.version();
this.headers = request.headers().newBuilder();
- this.body = StyxCoreObservable.of(copiedBuffer(request.body()));
+ this.body = new ByteStream(Flux.just(new Buffer(copiedBuffer(request.body()))));
}
/**
@@ -501,11 +476,22 @@ public Builder uri(String uri) {
* @param content request body
* @return {@code this}
*/
- public Builder body(StyxObservable content) {
+ public Builder body(ByteStream content) {
this.body = content;
return this;
}
+ /**
+ * Transforms request body.
+ *
+ * @param transformation a Function from ByteStream to ByteStream.
+ * @return a HttpRequest builder with a transformed message body.
+ */
+ public Builder body(Function transformation) {
+ this.body = transformation.apply(this.body);
+ return this;
+ }
+
/**
* Sets the unique ID for this request.
*
diff --git a/components/api/src/main/java/com/hotels/styx/api/HttpResponse.java b/components/api/src/main/java/com/hotels/styx/api/HttpResponse.java
index b1e2f65c33..94f4496563 100644
--- a/components/api/src/main/java/com/hotels/styx/api/HttpResponse.java
+++ b/components/api/src/main/java/com/hotels/styx/api/HttpResponse.java
@@ -17,21 +17,17 @@
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.util.ReferenceCountUtil;
-import rx.Observable;
+import reactor.core.publisher.Flux;
-import java.nio.charset.Charset;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
import java.util.function.Predicate;
import static com.google.common.base.Objects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.hotels.styx.api.FlowControlDisableOperator.disableFlowControl;
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
import static com.hotels.styx.api.HttpHeaderNames.SET_COOKIE;
import static com.hotels.styx.api.HttpHeaderNames.TRANSFER_ENCODING;
@@ -43,11 +39,7 @@
import static com.hotels.styx.api.ResponseCookie.decode;
import static com.hotels.styx.api.ResponseCookie.encode;
import static io.netty.buffer.ByteBufUtil.getBytes;
-import static io.netty.buffer.Unpooled.compositeBuffer;
-import static io.netty.buffer.Unpooled.copiedBuffer;
-import static io.netty.util.ReferenceCountUtil.release;
import static java.lang.Integer.parseInt;
-import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
@@ -62,7 +54,7 @@
*
* An {@code HttpResponse} object is immutable with respect to the response
* attributes and HTTP headers. Once an instance is created, they cannot change.
- *
+ *
* An {@code HttpResponse} body is a byte buffer stream that can be consumed
* as sequence of asynchronous events. Once consumed, the stream is exhausted and
* can not be reused. Conceptually each {@code HttpResponse} object has an
@@ -70,16 +62,16 @@
* a Styx Server implements a content producer for {@link HttpInterceptor}
* extensions. The producer receives data chunks from a network socket and
* publishes them to an appropriate content stream.
- *
+ *
* HTTP responses are created via {@code Builder} object, which can be created
* with static helper methods:
*
*
- * - {@code response()}
- * - {@code response(HttpResponseStatus)}
- * - {@code response(HttpResponseStatus, StyxObservable)}
+ * - {@code response()}
+ * - {@code response(HttpResponseStatus)}
+ * - {@code response(HttpResponseStatus, StyxObservable)}
*
- *
+ *
* A builder can also be created with one of the {@code Builder} constructors.
*
* A special method {@code newBuilder} creates a prepopulated {@code Builder}
@@ -90,7 +82,7 @@ public class HttpResponse implements StreamingHttpMessage {
private final HttpVersion version;
private final HttpResponseStatus status;
private final HttpHeaders headers;
- private final StyxObservable body;
+ private final ByteStream body;
HttpResponse(Builder builder) {
this.version = builder.version;
@@ -125,7 +117,7 @@ public static Builder response(HttpResponseStatus status) {
* @param body response body
* @return a new builder
*/
- public static Builder response(HttpResponseStatus status, StyxObservable body) {
+ public static Builder response(HttpResponseStatus status, ByteStream body) {
return new Builder(status).body(body);
}
@@ -141,7 +133,7 @@ public HttpHeaders headers() {
* @return the response body as a byte stream
*/
@Override
- public StyxObservable body() {
+ public ByteStream body() {
return body;
}
@@ -185,11 +177,11 @@ public boolean isRedirect() {
* Returns a {@link StyxObservable} that eventually produces a
* {@link FullHttpResponse}. The resulting full response object has the same
* response line, headers, and content as this response.
- *
+ *
* The content stream is aggregated asynchronously. The stream may be connected
* to a network socket or some other content producer. Once aggregated, a
* FullHttpResponse object is emitted on the returned {@link StyxObservable}.
- *
+ *
* A sole {@code maxContentBytes} argument is a backstop defence against excessively
* long content streams. The {@code maxContentBytes} should be set to a sensible
* value according to your application requirements and heap size. When the content
@@ -200,40 +192,22 @@ public boolean isRedirect() {
* @return a {@link StyxObservable}
*/
public StyxObservable toFullResponse(int maxContentBytes) {
- CompositeByteBuf byteBufs = compositeBuffer();
-
- Observable delegate = ((StyxCoreObservable) body)
- .delegate()
- .lift(disableFlowControl())
- .doOnError(e -> byteBufs.release())
- .collect(() -> byteBufs, (composite, part) -> {
- long newSize = composite.readableBytes() + part.readableBytes();
-
- if (newSize > maxContentBytes) {
- release(composite);
- release(part);
-
- throw new ContentOverflowException(format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxContentBytes));
- }
- composite.addComponent(part);
- composite.writerIndex(composite.writerIndex() + part.readableBytes());
- })
- .map(HttpResponse::decodeAndRelease)
- .map(decoded -> new FullHttpResponse.Builder(this, decoded)
- .disableValidation()
- .build());
-
- return new StyxCoreObservable<>(delegate);
+ return StyxObservable.from(body.aggregate(maxContentBytes))
+ .map(it -> new FullHttpResponse.Builder(this, decodeAndRelease(it))
+ .disableValidation()
+ .build()
+ );
}
- private static byte[] decodeAndRelease(CompositeByteBuf aggregate) {
+ private static byte[] decodeAndRelease(Buffer aggregate) {
try {
- return getBytes(aggregate);
+ return getBytes(aggregate.delegate());
} finally {
- aggregate.release();
+ aggregate.delegate().release();
}
}
+
/**
* Decodes "Set-Cookie" header values and returns them as set of {@link ResponseCookie} objects.
*
@@ -248,7 +222,7 @@ public Set cookies() {
*
* @param name cookie name
* @return an optional {@link ResponseCookie} value if corresponding cookie name is present,
- * or {@link Optional#empty} if not.
+ * or {@link Optional#empty} if not.
*/
public Optional cookie(String name) {
return cookies().stream()
@@ -292,14 +266,14 @@ public static final class Builder {
private HttpHeaders.Builder headers;
private HttpVersion version = HTTP_1_1;
private boolean validate = true;
- private StyxObservable body;
+ private ByteStream body;
/**
* Creates a new {@link Builder} object with default attributes.
*/
public Builder() {
this.headers = new HttpHeaders.Builder();
- this.body = new StyxCoreObservable<>(Observable.empty());
+ this.body = new ByteStream(Flux.empty());
}
/**
@@ -329,16 +303,16 @@ public Builder(HttpResponse response) {
* Creates a new {@link Builder} object from a response code and a content stream.
*
* Builder's response status line parameters and the HTTP headers are populated from
- * the given {@code response} object, but the content stream is set to {@code contentStream}.
+ * the given {@code response} object, but the content stream is set to {@code ByteStream}.
*
- * @param response a full response for which the builder is based on
- * @param contentStream a content byte stream
+ * @param response a full response for which the builder is based on
+ * @param byteStream a content byte stream
*/
- public Builder(FullHttpResponse response, StyxObservable contentStream) {
+ public Builder(FullHttpResponse response, ByteStream byteStream) {
this.status = statusWithCode(response.status().code());
this.version = httpVersion(response.version().toString());
this.headers = response.headers().newBuilder();
- this.body = contentStream;
+ this.body = requireNonNull(byteStream);
}
/**
@@ -358,20 +332,20 @@ public Builder status(HttpResponseStatus status) {
* @param content response body
* @return {@code this}
*/
- public Builder body(StyxObservable content) {
- this.body = content;
+ public Builder body(ByteStream content) {
+ this.body = requireNonNull(content);
return this;
}
/**
- * Sets the message body by encoding a {@link StyxObservable} of {@link String}s into bytes.
+ * Transforms request body.
*
- * @param contentObservable message body content.
- * @param charset character set
- * @return {@code this}
+ * @param transformation a Function from ByteStream to ByteStream.
+ * @return a HttpResponhse builder with a transformed message body.
*/
- public Builder body(StyxObservable contentObservable, Charset charset) {
- return body(contentObservable.map(content -> copiedBuffer(content, charset)));
+ public Builder body(Function transformation) {
+ this.body = requireNonNull(transformation.apply(this.body));
+ return this;
}
/**
@@ -554,12 +528,8 @@ public Builder removeHeader(CharSequence name) {
*/
// TODO: See https://github.com/HotelsDotCom/styx/issues/201
public Builder removeBody() {
- Observable delegate = ((StyxCoreObservable) body)
- .delegate()
- .doOnNext(ReferenceCountUtil::release)
- .ignoreElements();
-
- return body(new StyxCoreObservable<>(delegate));
+ this.body = body.drop();
+ return this;
}
@@ -581,8 +551,8 @@ public Builder headers(HttpHeaders headers) {
* method is invoked. Specifically that:
*
*
- * There is maximum of only one {@code Content-Length} header
- * The {@code Content-Length} header is zero or positive integer
+ * There is maximum of only one {@code Content-Length} header
+ * The {@code Content-Length} header is zero or positive integer
*
*
* @return {@code this}
@@ -597,16 +567,16 @@ public Builder disableValidation() {
*
* Validates and builds a {link HttpResponse} object. Object validation can be
* disabled with {@link this.disableValidation} method.
- *
+ *
* When validation is enabled (by default), ensures that:
*
*
- * There is maximum of only one {@code Content-Length} header
- * The {@code Content-Length} header is zero or positive integer
+ * There is maximum of only one {@code Content-Length} header
+ * The {@code Content-Length} header is zero or positive integer
*
*
- * @throws IllegalArgumentException when validation fails
* @return a new full response.
+ * @throws IllegalArgumentException when validation fails
*/
public HttpResponse build() {
if (validate) {
diff --git a/components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java b/components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java
index 8e838eb82a..2b0947cedb 100644
--- a/components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java
+++ b/components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java
@@ -15,12 +15,8 @@
*/
package com.hotels.styx.api;
-import io.netty.buffer.ByteBuf;
-import rx.Subscriber;
-
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_TYPE;
@@ -48,7 +44,7 @@ interface StreamingHttpMessage {
*
* @return the body
*/
- StyxObservable body();
+ ByteStream body();
/**
* Returns the value of the header with the specified {@code name}.
@@ -97,27 +93,4 @@ default boolean chunked() {
return HttpMessageSupport.chunked(headers());
}
- default CompletableFuture releaseContentBuffers() {
- CompletableFuture future = new CompletableFuture<>();
-
- ((StyxCoreObservable) body()).delegate()
- .subscribe(new Subscriber() {
- @Override
- public void onCompleted() {
- future.complete(true);
- }
-
- @Override
- public void onError(Throwable e) {
- future.completeExceptionally(e);
- }
-
- @Override
- public void onNext(ByteBuf byteBuf) {
- byteBuf.release();
- }
- });
-
- return future;
- }
}
diff --git a/components/api/src/test/java/com/hotels/styx/api/ByteStreamAggregatorTest.java b/components/api/src/test/java/com/hotels/styx/api/ByteStreamAggregatorTest.java
new file mode 100644
index 0000000000..17186c6caa
--- /dev/null
+++ b/components/api/src/test/java/com/hotels/styx/api/ByteStreamAggregatorTest.java
@@ -0,0 +1,157 @@
+/*
+ Copyright (C) 2013-2018 Expedia Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.hotels.styx.api;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscription;
+import org.testng.annotations.Test;
+import reactor.core.publisher.Flux;
+import reactor.test.publisher.TestPublisher;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertTrue;
+
+public class ByteStreamAggregatorTest {
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void allowsOnlyOneAggregation() {
+ Publisher upstream = Flux.just(new Buffer("x", UTF_8));
+ ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 100);
+
+ aggregator.apply();
+ aggregator.apply();
+ }
+
+ @Test
+ public void aggregatesZeroBuffers() throws ExecutionException, InterruptedException {
+ ByteStreamAggregator aggregator = new ByteStreamAggregator(Flux.empty(), 100);
+
+ Buffer a = aggregator.apply().get();
+ assertThat(a.size(), is(0));
+ assertThat(new String(a.content(), UTF_8), is(""));
+ }
+
+ @Test
+ public void aggregatesOneBuffer() throws ExecutionException, InterruptedException {
+ ByteStreamAggregator aggregator = new ByteStreamAggregator(Flux.just(new Buffer("x", UTF_8)), 100);
+
+ Buffer a = aggregator.apply().get();
+ assertThat(a.size(), is(1));
+ assertThat(new String(a.content(), UTF_8), is("x"));
+ }
+
+ @Test
+ public void aggregatesManyBuffers() throws ExecutionException, InterruptedException {
+ ByteStreamAggregator aggregator = new ByteStreamAggregator(Flux.just(
+ new Buffer("x", UTF_8),
+ new Buffer("y", UTF_8)), 100);
+
+ Buffer a = aggregator.apply().get();
+ assertThat(a.size(), is(2));
+ assertThat(new String(a.content(), UTF_8), is("xy"));
+ }
+
+ @Test
+ public void aggregatesUpToNBytes() {
+ AtomicReference causeCapture = new AtomicReference<>(null);
+
+ Buffer a = new Buffer("aaabbb", UTF_8);
+ Buffer b = new Buffer("ccc", UTF_8);
+
+ TestPublisher upstream = TestPublisher.create();
+
+ ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 8);
+
+ CompletableFuture future = aggregator.apply()
+ .exceptionally(cause -> {
+ causeCapture.set(cause);
+ throw new RuntimeException();
+ });
+
+ upstream.next(a);
+ upstream.next(b);
+
+ upstream.assertCancelled();
+
+ assertTrue(future.isCompletedExceptionally());
+ assertThat(causeCapture.get(), instanceOf(ContentOverflowException.class));
+
+ assertThat(a.delegate().refCnt(), is(0));
+ assertThat(b.delegate().refCnt(), is(0));
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void checkForNullSubscription() {
+ Publisher upstream = mock(Publisher.class);
+ ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 100);
+
+ aggregator.onSubscribe(null);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void allowsOnlyOneSubscription() {
+ Publisher upstream = mock(Publisher.class);
+ Subscription subscription1 = mock(Subscription.class);
+ Subscription subscription2 = mock(Subscription.class);
+
+ ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 100);
+ aggregator.onSubscribe(subscription1);
+
+ try {
+ aggregator.onSubscribe(subscription2);
+ } catch (IllegalStateException cause) {
+ verify(subscription2).cancel();
+ throw cause;
+ }
+ }
+
+ @Test
+ public void emitsErrors() {
+ AtomicReference causeCapture = new AtomicReference<>(null);
+
+ Buffer a = new Buffer("aaabbb", UTF_8);
+
+ TestPublisher upstream = TestPublisher.create();
+
+ ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 8);
+
+ CompletableFuture future = aggregator.apply()
+ .exceptionally(cause -> {
+ causeCapture.set(cause);
+ throw new RuntimeException();
+ });
+
+ upstream.next(a);
+ upstream.error(new RuntimeException("something broke"));
+
+ upstream.assertCancelled();
+
+ assertTrue(future.isCompletedExceptionally());
+ assertThat(causeCapture.get(), instanceOf(RuntimeException.class));
+ assertThat(causeCapture.get().getMessage(), is("something broke"));
+
+ assertThat(a.delegate().refCnt(), is(0));
+ }
+}
\ No newline at end of file
diff --git a/components/api/src/test/java/com/hotels/styx/api/ByteStreamTest.java b/components/api/src/test/java/com/hotels/styx/api/ByteStreamTest.java
new file mode 100644
index 0000000000..e285cf6783
--- /dev/null
+++ b/components/api/src/test/java/com/hotels/styx/api/ByteStreamTest.java
@@ -0,0 +1,190 @@
+/*
+ Copyright (C) 2013-2018 Expedia Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.hotels.styx.api;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+
+
+public class ByteStreamTest {
+ private Buffer buf1;
+ private Buffer buf2;
+ private Buffer buf3;
+
+ @BeforeMethod
+ public void setUp() {
+ buf1 = new Buffer("a", UTF_8);
+ buf2 = new Buffer("b", UTF_8);
+ buf3 = new Buffer("c", UTF_8);
+ }
+
+ @Test
+ public void publishesContent() {
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3));
+
+ StepVerifier.create(stream)
+ .expectNext(buf1)
+ .expectNext(buf2)
+ .expectNext(buf3)
+ .verifyComplete();
+ }
+
+ @Test
+ public void supportsBackpressure() {
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3));
+
+ StepVerifier.create(stream, 0)
+ .expectSubscription()
+ .thenRequest(1)
+ .expectNext(buf1)
+ .thenRequest(2)
+ .expectNext(buf2, buf3)
+ .verifyComplete();
+ }
+
+ @Test
+ public void mapsContent() {
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3));
+
+ ByteStream mapped = stream.map(this::toUpperCase);
+
+ StepVerifier.create(Flux.from(mapped).map(this::decodeUtf8String))
+ .expectSubscription()
+ .expectNext("A", "B", "C")
+ .verifyComplete();
+ }
+
+ @Test
+ public void releasesRefcountForMappedBuffers() {
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3));
+
+ ByteStream mapped = stream.map(this::toUpperCase);
+
+ StepVerifier.create(Flux.from(mapped).map(this::decodeUtf8String))
+ .expectSubscription()
+ .expectNext("A", "B", "C")
+ .verifyComplete();
+
+ assertThat(buf1.delegate().refCnt(), is(0));
+ assertThat(buf2.delegate().refCnt(), is(0));
+ assertThat(buf3.delegate().refCnt(), is(0));
+ }
+
+ @Test
+ public void mapRetainsRefcountsForInlineBufferChanges() {
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3));
+
+ ByteStream mapped = stream.map(buf -> buf);
+
+ StepVerifier.create(Flux.from(mapped).map(this::decodeUtf8String))
+ .expectSubscription()
+ .expectNextCount(3)
+ .verifyComplete();
+
+ assertThat(buf1.delegate().refCnt(), is(1));
+ assertThat(buf2.delegate().refCnt(), is(1));
+ assertThat(buf3.delegate().refCnt(), is(1));
+ }
+
+ @Test
+ public void discardsContent() {
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3));
+
+ ByteStream discarded = stream.drop();
+
+ StepVerifier.create(discarded)
+ .expectSubscription()
+ .verifyComplete();
+
+ assertThat(buf1.delegate().refCnt(), is(0));
+ assertThat(buf2.delegate().refCnt(), is(0));
+ assertThat(buf3.delegate().refCnt(), is(0));
+ }
+
+ @Test
+ public void aggregatesContent() throws ExecutionException, InterruptedException {
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3));
+
+ Buffer aggregated = stream.aggregate(100).get();
+ assertThat(decodeUtf8String(aggregated), is("abc"));
+ }
+
+ @Test
+ public void contentAggregationOverflow() throws ExecutionException, InterruptedException {
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3));
+
+ Throwable cause = stream.aggregate(2)
+ .handle((result, throwable) -> throwable)
+ .get();
+
+ assertThat(cause, instanceOf(ContentOverflowException.class));
+ }
+
+
+ @Test
+ public void deliversAtEndOfStreamNotification() {
+ AtomicBoolean terminated = new AtomicBoolean();
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2))
+ .doOnEnd(maybeCause -> terminated.set(maybeCause == Optional.empty()));
+
+ StepVerifier.create(new ByteStream(stream), 0)
+ .thenRequest(1)
+ .expectNext(buf1)
+ .then(() -> assertThat(terminated.get(), is(false)))
+ .thenRequest(1)
+ .expectNext(buf2)
+ .then(() -> assertThat(terminated.get(), is(true)))
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void deliversAtEndOfStreamNotificationWhenTerminated() {
+ AtomicBoolean terminated = new AtomicBoolean();
+ ByteStream stream = new ByteStream(Flux.just(buf1, buf2).concatWith(Flux.error(new RuntimeException("bang!"))))
+ .doOnEnd(maybeCause -> terminated.set(maybeCause.isPresent()));
+
+ StepVerifier.create(new ByteStream(stream), 0)
+ .thenRequest(1)
+ .expectNext(buf1)
+ .then(() -> assertThat(terminated.get(), is(false)))
+ .thenRequest(1)
+ .expectNext(buf2)
+ .then(() -> assertThat(terminated.get(), is(true)))
+ .expectError()
+ .verify();
+ }
+
+ private String decodeUtf8String(Buffer buffer) {
+ return new String(buffer.content(), UTF_8);
+ }
+
+ private Buffer toUpperCase(Buffer buffer) {
+ return new Buffer(decodeUtf8String(buffer).toUpperCase(), UTF_8);
+ }
+
+}
diff --git a/components/api/src/test/java/com/hotels/styx/api/FlowControlDisableOperatorTest.java b/components/api/src/test/java/com/hotels/styx/api/FlowControlDisableOperatorTest.java
deleted file mode 100644
index aae857e33b..0000000000
--- a/components/api/src/test/java/com/hotels/styx/api/FlowControlDisableOperatorTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- Copyright (C) 2013-2018 Expedia Inc.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-package com.hotels.styx.api;
-
-import io.netty.buffer.ByteBuf;
-import org.testng.annotations.Test;
-import rx.observers.TestSubscriber;
-import rx.subjects.PublishSubject;
-
-import static com.hotels.styx.api.FlowControlDisableOperator.disableFlowControl;
-import static io.netty.buffer.Unpooled.copiedBuffer;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static rx.Observable.just;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-
-public class FlowControlDisableOperatorTest {
- /*
- This test proves that when the subscriber requests back-pressure, but the FlowControlDisableOperator is applied,
- the events are delivered as if no back-pressure was requested.
- */
- @Test
- public void liftedObservableDisablesBackpressure() throws InterruptedException {
- TestSubscriber subscriber = new TestSubscriber<>(1);
-
- just("one", "two", "three")
- .lift(disableFlowControl())
- .subscribe(subscriber);
-
- subscriber.assertValues("one", "two", "three");
- }
-
- /*
- This test does not actually test the FlowControlDisableOperator, rather it is used to demonstrate the difference
- that the presence of the operator makes to the delivery of events.
- */
- @Test
- public void nonLiftedObservableCanBeSubscribedWithBackpressure() {
- TestSubscriber subscriber = new TestSubscriber<>(1);
-
- just("one", "two", "three")
- .subscribe(subscriber);
-
- subscriber.assertValues("one");
- subscriber.requestMore(1);
- subscriber.assertValues("one", "two");
- subscriber.requestMore(1);
- subscriber.assertValues("one", "two", "three");
- }
-
- @Test
- public void releasesReferenceCountsAfterDownstreamUnsubscribes() throws Exception {
- TestSubscriber subscriber = new TestSubscriber<>();
- PublishSubject source = PublishSubject.create();
-
- ByteBuf buf1 = copiedBuffer("abc", UTF_8);
- ByteBuf buf2 = copiedBuffer("abc", UTF_8);
- ByteBuf buf3 = copiedBuffer("abc", UTF_8);
-
- source.lift(disableFlowControl())
- .subscribe(subscriber);
-
- source.onNext(buf1);
- assertThat(buf1.refCnt(), is(1));
-
- subscriber.unsubscribe();
-
- source.onNext(buf2);
- assertThat(buf2.refCnt(), is(0));
-
- source.onNext(buf3);
- assertThat(buf3.refCnt(), is(0));
- }
-}
\ No newline at end of file
diff --git a/components/api/src/test/java/com/hotels/styx/api/FullHttpRequestTest.java b/components/api/src/test/java/com/hotels/styx/api/FullHttpRequestTest.java
index 1da4205415..2b072354f8 100644
--- a/components/api/src/test/java/com/hotels/styx/api/FullHttpRequestTest.java
+++ b/components/api/src/test/java/com/hotels/styx/api/FullHttpRequestTest.java
@@ -16,10 +16,10 @@
package com.hotels.styx.api;
import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import rx.observers.TestSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
import java.util.Optional;
@@ -30,13 +30,13 @@
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
import static com.hotels.styx.api.HttpHeaderNames.COOKIE;
import static com.hotels.styx.api.HttpHeaderNames.HOST;
-import static com.hotels.styx.api.Url.Builder.url;
-import static com.hotels.styx.api.RequestCookie.requestCookie;
import static com.hotels.styx.api.HttpMethod.DELETE;
import static com.hotels.styx.api.HttpMethod.GET;
import static com.hotels.styx.api.HttpMethod.POST;
import static com.hotels.styx.api.HttpVersion.HTTP_1_0;
import static com.hotels.styx.api.HttpVersion.HTTP_1_1;
+import static com.hotels.styx.api.RequestCookie.requestCookie;
+import static com.hotels.styx.api.Url.Builder.url;
import static com.hotels.styx.support.matchers.IsOptional.isAbsent;
import static com.hotels.styx.support.matchers.IsOptional.isValue;
import static com.hotels.styx.support.matchers.MapMatcher.isMap;
@@ -86,13 +86,9 @@ public void convertsToStreamingHttpRequest() throws Exception {
public void convertsToStreamingHttpRequestWithEmptyBody(FullHttpRequest fullRequest) {
HttpRequest streaming = fullRequest.toStreamingRequest();
- TestSubscriber subscriber = TestSubscriber.create(0);
- subscriber.requestMore(1);
-
- ((StyxCoreObservable) streaming.body()).delegate().subscribe(subscriber);
-
- assertThat(subscriber.getOnNextEvents().size(), is(0));
- subscriber.assertCompleted();
+ StepVerifier.create(streaming.body())
+ .expectComplete()
+ .verify();
}
// We want to ensure that these are all considered equivalent
@@ -234,12 +230,13 @@ public void requestBodyCannotBeChangedViaStreamingRequest() {
.body("original", UTF_8)
.build();
- ByteBuf byteBuf = ((StyxCoreObservable) original.toStreamingRequest().body())
- .delegate()
- .toBlocking()
- .first();
-
- byteBuf.array()[0] = 'A';
+ Flux.from(original.toStreamingRequest()
+ .body()
+ .map(buffer -> {
+ buffer.delegate().array()[0] = 'A';
+ return buffer;
+ }))
+ .subscribe();
assertThat(original.bodyAs(UTF_8), is("original"));
}
diff --git a/components/api/src/test/java/com/hotels/styx/api/FullHttpResponseTest.java b/components/api/src/test/java/com/hotels/styx/api/FullHttpResponseTest.java
index fc33891648..ec351704e6 100644
--- a/components/api/src/test/java/com/hotels/styx/api/FullHttpResponseTest.java
+++ b/components/api/src/test/java/com/hotels/styx/api/FullHttpResponseTest.java
@@ -15,11 +15,10 @@
*/
package com.hotels.styx.api;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import rx.observers.TestSubscriber;
+import reactor.core.publisher.Flux;
import java.util.Optional;
import java.util.Set;
@@ -28,8 +27,6 @@
import static com.hotels.styx.api.FullHttpResponse.response;
import static com.hotels.styx.api.HttpHeader.header;
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
-import static com.hotels.styx.api.ResponseCookie.responseCookie;
-import static com.hotels.styx.api.matchers.HttpHeadersMatcher.isNotCacheable;
import static com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY;
import static com.hotels.styx.api.HttpResponseStatus.BAD_REQUEST;
import static com.hotels.styx.api.HttpResponseStatus.CREATED;
@@ -41,6 +38,8 @@
import static com.hotels.styx.api.HttpResponseStatus.SEE_OTHER;
import static com.hotels.styx.api.HttpResponseStatus.TEMPORARY_REDIRECT;
import static com.hotels.styx.api.HttpVersion.HTTP_1_1;
+import static com.hotels.styx.api.ResponseCookie.responseCookie;
+import static com.hotels.styx.api.matchers.HttpHeadersMatcher.isNotCacheable;
import static com.hotels.styx.support.matchers.IsOptional.isAbsent;
import static com.hotels.styx.support.matchers.IsOptional.isValue;
import static java.nio.charset.StandardCharsets.UTF_16;
@@ -322,16 +321,14 @@ public void overridesContent() {
}
@Test(dataProvider = "emptyBodyResponses")
- public void convertsToStreamingHttpResponseWithEmptyBody(FullHttpResponse response) {
+ public void convertsToStreamingHttpResponseWithEmptyBody(FullHttpResponse response) throws ExecutionException, InterruptedException {
HttpResponse streaming = response.toStreamingResponse();
- TestSubscriber subscriber = TestSubscriber.create(0);
- subscriber.requestMore(1);
-
- ((StyxCoreObservable) streaming.body()).delegate().subscribe(subscriber);
+ byte[] result = streaming.body().aggregate(1000)
+ .get()
+ .content();
- assertThat(subscriber.getOnNextEvents().size(), is(0));
- subscriber.assertCompleted();
+ assertThat(result.length, is(0));
}
// We want to ensure that these are all considered equivalent
@@ -432,29 +429,30 @@ public void responseBodyCannotBeChangedViaStreamingMessage() {
.body("original", UTF_8)
.build();
- ByteBuf byteBuf = ((StyxCoreObservable) original.toStreamingResponse().body())
- .delegate()
- .toBlocking()
- .first();
-
- byteBuf.array()[0] = 'A';
+ Flux.from(original.toStreamingResponse()
+ .body()
+ .map(buf -> {
+ buf.delegate().array()[0] = 'A';
+ return buf;
+ }))
+ .subscribe();
assertThat(original.bodyAs(UTF_8), is("original"));
}
- @Test(expectedExceptions = io.netty.util.IllegalReferenceCountException.class)
+ @Test
public void toFullResponseReleasesOriginalRefCountedBuffers() throws ExecutionException, InterruptedException {
- ByteBuf content = Unpooled.copiedBuffer("original", UTF_8);
+ Buffer content = new Buffer(Unpooled.copiedBuffer("original", UTF_8));
HttpResponse original = HttpResponse.response(OK)
- .body(StyxObservable.of(content))
+ .body(new ByteStream(Flux.just(content)))
.build();
original.toFullResponse(100)
.asCompletableFuture()
.get();
- content.array()[0] = 'A';
+ assertThat(content.delegate().refCnt(), is(0));
}
@Test
diff --git a/components/api/src/test/java/com/hotels/styx/api/HttpRequestTest.java b/components/api/src/test/java/com/hotels/styx/api/HttpRequestTest.java
index 18204b3c95..7c1d74002b 100644
--- a/components/api/src/test/java/com/hotels/styx/api/HttpRequestTest.java
+++ b/components/api/src/test/java/com/hotels/styx/api/HttpRequestTest.java
@@ -16,10 +16,9 @@
package com.hotels.styx.api;
import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import reactor.core.publisher.Flux;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
@@ -41,7 +40,6 @@
import static com.hotels.styx.support.matchers.IsOptional.isAbsent;
import static com.hotels.styx.support.matchers.IsOptional.isValue;
import static com.hotels.styx.support.matchers.MapMatcher.isMap;
-import static io.netty.buffer.Unpooled.copiedBuffer;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -77,19 +75,19 @@ public void decodesToFullHttpRequest() throws Exception {
assertThat(full.body(), is(bytes("foobar")));
}
- @Test(expectedExceptions = io.netty.util.IllegalReferenceCountException.class)
+ @Test
public void toFullRequestReleasesOriginalReferenceCountedBuffers() throws ExecutionException, InterruptedException {
- ByteBuf content = Unpooled.copiedBuffer("original", UTF_8);
+ Buffer content = new Buffer("original", UTF_8);
HttpRequest original = HttpRequest.get("/foo")
- .body(StyxObservable.of(content))
+ .body(new ByteStream(Flux.just(content)))
.build();
FullHttpRequest fullRequest = original.toFullRequest(100)
.asCompletableFuture()
.get();
- content.array()[0] = 'A';
+ assertThat(content.delegate().refCnt(), is(0));
assertThat(fullRequest.bodyAs(UTF_8), is("original"));
}
@@ -108,12 +106,12 @@ public void encodesToStreamingHttpRequestWithEmptyBody(HttpRequest streamingRequ
private Object[][] emptyBodyRequests() {
return new Object[][]{
{get("/foo/bar").build()},
- {post("/foo/bar", StyxCoreObservable.empty()).build()},
+ {post("/foo/bar", new ByteStream(Flux.empty())).build()},
};
}
@Test
- public void createsARequestWithDefaultValues() throws Exception {
+ public void createsARequestWithDefaultValues() {
HttpRequest request = get("/index").build();
assertThat(request.version(), is(HTTP_1_1));
assertThat(request.url().toString(), is("/index"));
@@ -295,7 +293,7 @@ public void canRemoveAHeader() {
@Test
public void shouldSetsContentLengthForNonStreamingBodyMessage() {
- assertThat(put("/home").body(StyxObservable.of(copiedBuffer("Hello", UTF_8))).build().header(CONTENT_LENGTH), isAbsent());
+ assertThat(put("/home").body(new ByteStream(Flux.just(new Buffer("Hello", UTF_8)))).build().header(CONTENT_LENGTH), isAbsent());
}
@Test
@@ -428,16 +426,24 @@ public void removesCookiesInSameBuilder() {
assertThat(r1.cookie("x"), isAbsent());
}
- private static StyxObservable body(String... contents) {
- return StyxObservable.from(Stream.of(contents)
- .map(content -> copiedBuffer(content, UTF_8))
- .collect(toList()));
+ private static ByteStream body(String... contents) {
+
+ return new ByteStream(
+ Flux.fromIterable(
+ Stream.of(contents)
+ .map(content -> new Buffer(content, UTF_8))
+ .collect(toList())));
}
- private static String bytesToString(StyxObservable body) throws ExecutionException, InterruptedException {
- return body.reduce((byteBuf, result) -> result + byteBuf.toString(UTF_8), "")
- .asCompletableFuture()
- .get();
+ private static String bytesToString(ByteStream body) {
+ try {
+ return new String(body.aggregate(100000).get().content(), UTF_8);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
}
private static byte[] bytes(String content) {
diff --git a/components/api/src/test/java/com/hotels/styx/api/HttpResponseTest.java b/components/api/src/test/java/com/hotels/styx/api/HttpResponseTest.java
index 0aa5efd7ef..f85f355e74 100644
--- a/components/api/src/test/java/com/hotels/styx/api/HttpResponseTest.java
+++ b/components/api/src/test/java/com/hotels/styx/api/HttpResponseTest.java
@@ -15,20 +15,16 @@
*/
package com.hotels.styx.api;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import reactor.core.publisher.Flux;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static com.hotels.styx.api.HttpHeader.header;
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
-import static com.hotels.styx.api.ResponseCookie.responseCookie;
-import static com.hotels.styx.api.matchers.HttpHeadersMatcher.isNotCacheable;
import static com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY;
import static com.hotels.styx.api.HttpResponseStatus.BAD_REQUEST;
import static com.hotels.styx.api.HttpResponseStatus.CREATED;
@@ -41,11 +37,12 @@
import static com.hotels.styx.api.HttpResponseStatus.TEMPORARY_REDIRECT;
import static com.hotels.styx.api.HttpVersion.HTTP_1_0;
import static com.hotels.styx.api.HttpVersion.HTTP_1_1;
+import static com.hotels.styx.api.ResponseCookie.responseCookie;
+import static com.hotels.styx.api.matchers.HttpHeadersMatcher.isNotCacheable;
import static com.hotels.styx.support.matchers.IsOptional.isAbsent;
import static com.hotels.styx.support.matchers.IsOptional.isValue;
-import static java.nio.charset.StandardCharsets.UTF_16;
+import static io.netty.buffer.Unpooled.copiedBuffer;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -54,16 +51,16 @@
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
public class HttpResponseTest {
+
@Test
public void encodesToFullHttpResponse() throws Exception {
HttpResponse response = response(CREATED)
.version(HTTP_1_0)
.header("HeaderName", "HeaderValue")
.cookies(responseCookie("CookieName", "CookieValue").build())
- .body(body("foo", "bar"))
+ .body(new ByteStream(Flux.just("foo", "bar").map(it -> new Buffer(copiedBuffer(it, UTF_8)))))
.build();
FullHttpResponse full = response.toFullResponse(0x1000)
@@ -92,7 +89,7 @@ public void encodesToFullHttpResponseWithEmptyBody(HttpResponse response) throws
private Object[][] emptyBodyResponses() {
return new Object[][]{
{response().build()},
- {response().body(StyxCoreObservable.empty()).build()},
+ {response().body(new ByteStream(Flux.empty())).build()},
};
}
@@ -169,16 +166,22 @@ public void canRemoveAHeader() {
}
@Test
- public void canRemoveResponseBody() {
+ public void canRemoveResponseBody() throws ExecutionException, InterruptedException {
+ Buffer originalContent = new Buffer("I'm going to get removed.", UTF_8);
+
HttpResponse response = response(NO_CONTENT)
- .body(body("shouldn't be here"))
+ .body(new ByteStream(Flux.just(originalContent)))
.build();
- HttpResponse shouldClearBody = response.newBuilder()
- .body(null)
- .build();
+ FullHttpResponse fullResponse = response.newBuilder()
+ .body(ByteStream::drop)
+ .build()
+ .toFullResponse(1000)
+ .asCompletableFuture()
+ .get();
- assertThat(shouldClearBody.body(), is(nullValue()));
+ assertThat(fullResponse.body().length, is(0));
+ assertThat(originalContent.delegate().refCnt(), is(0));
}
@Test
@@ -294,28 +297,6 @@ public void rejectsInvalidContentLength() {
.build();
}
- @Test
- public void encodesBodyWithCharset() throws InterruptedException, ExecutionException, TimeoutException {
- StyxObservable o = StyxObservable.of("Hello, World!");
-
- FullHttpResponse responseUtf8 = response()
- .body(o, UTF_8)
- .build()
- .toFullResponse(1_000_000)
- .asCompletableFuture()
- .get(1, SECONDS);
-
- FullHttpResponse responseUtf16 = response()
- .body(o, UTF_16)
- .build()
- .toFullResponse(1_000_000)
- .asCompletableFuture()
- .get(1, SECONDS);
-
- assertThat(responseUtf8.body(), is("Hello, World!".getBytes(UTF_8)));
- assertThat(responseUtf16.body(), is("Hello, World!".getBytes(UTF_16)));
- }
-
@Test
public void addsCookies() {
HttpResponse response = response()
@@ -380,17 +361,15 @@ private static HttpResponse.Builder response(HttpResponseStatus status) {
return HttpResponse.response(status);
}
- private static StyxObservable body(String... contents) {
- return StyxObservable.from(Stream.of(contents)
- .map(content -> Unpooled.copiedBuffer(content, UTF_8))
- .collect(toList()));
+ private static ByteStream body(String... contents) {
+ return new ByteStream(Flux.fromIterable(
+ Stream.of(contents)
+ .map(content -> new Buffer(copiedBuffer(content, UTF_8)))
+ .collect(toList())));
}
- private static String bytesToString(StyxObservable body) throws Exception {
- return body.reduce((buf, result) -> result + buf.toString(UTF_8), "")
- .asCompletableFuture()
- .get();
-
+ private static String bytesToString(ByteStream body) throws Exception {
+ return new String(body.aggregate(100000).get().content(), UTF_8);
}
private static byte[] bytes(String s) {
diff --git a/components/api/src/test/java/com/hotels/styx/api/ServerCookieEncoderTest.java b/components/api/src/test/java/com/hotels/styx/api/ServerCookieEncoderTest.java
index 65349a15a8..ef6c350a5c 100644
--- a/components/api/src/test/java/com/hotels/styx/api/ServerCookieEncoderTest.java
+++ b/components/api/src/test/java/com/hotels/styx/api/ServerCookieEncoderTest.java
@@ -15,7 +15,6 @@
*/
package com.hotels.styx.api;
-import com.hotels.styx.api.ServerCookieEncoder;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import org.testng.annotations.Test;
diff --git a/components/client/pom.xml b/components/client/pom.xml
index 589ddb6e82..707bad034d 100644
--- a/components/client/pom.xml
+++ b/components/client/pom.xml
@@ -77,6 +77,12 @@
org.mockito
mockito-all