diff --git a/components/api/pom.xml b/components/api/pom.xml index 370392638e..e774f58d18 100644 --- a/components/api/pom.xml +++ b/components/api/pom.xml @@ -26,6 +26,28 @@ rxjava + + io.reactivex + rxjava-reactive-streams + + + + + io.projectreactor + reactor-core + + + + io.projectreactor + reactor-test + test + + + + org.reactivestreams + reactive-streams + + org.hdrhistogram HdrHistogram diff --git a/components/api/src/main/java/com/hotels/styx/api/Buffer.java b/components/api/src/main/java/com/hotels/styx/api/Buffer.java new file mode 100644 index 0000000000..a8cfc75e13 --- /dev/null +++ b/components/api/src/main/java/com/hotels/styx/api/Buffer.java @@ -0,0 +1,73 @@ +/* + Copyright (C) 2013-2018 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.hotels.styx.api; + +import io.netty.buffer.ByteBuf; + +import java.nio.charset.Charset; + +import static io.netty.buffer.Unpooled.copiedBuffer; +import static java.util.Objects.requireNonNull; + +/** + * A byte buffer. + * + */ +public final class Buffer { + private final ByteBuf delegate; + + Buffer(ByteBuf byteBuf) { + this.delegate = requireNonNull(byteBuf); + } + + /** + * Creates a new Buffer from {@link String} content with specified character encoding. + * + * @param content content + * @param charset desired character encoding + */ + public Buffer(String content, Charset charset) { + this(copiedBuffer(content, charset)); + } + + /** + * Returns a size of the Buffer in bytes. + * @return a size in bytes + */ + public int size() { + return delegate.readableBytes(); + } + + /** + * Returns buffer content as array of bytes. + * + * @return a byte array + */ + public byte[] content() { + byte[] bytes = new byte[delegate.readableBytes()]; + delegate.getBytes(delegate.readerIndex(), bytes); + return bytes; + } + + /** + * The underlying Netty ByteBuf. + * + * @return a Netty ByteBuf object + */ + ByteBuf delegate() { + return delegate; + } +} diff --git a/components/api/src/main/java/com/hotels/styx/api/ByteStream.java b/components/api/src/main/java/com/hotels/styx/api/ByteStream.java new file mode 100644 index 0000000000..bff9b18529 --- /dev/null +++ b/components/api/src/main/java/com/hotels/styx/api/ByteStream.java @@ -0,0 +1,139 @@ +/* + Copyright (C) 2013-2018 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.hotels.styx.api; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +/** + * A stream of Styx byte {@link Buffer} objects constituting a HTTP message body. + * + * This {@code ByteStream} class implements a reactive streams {@link Publisher} interface, + * therefore being interoperable with other conforming libraries such as Reactor and + * Rx Java 2.0. + * + * The class provides a set of operations to transform and inspect the byte stream. + * + * The class also provides methods for consuming the stream. + ** + */ +public class ByteStream implements Publisher { + private final Publisher stream; + + /** + * Create a new {@code ByteStream} from a reactive streams {@link Publisher}. + * + * @param stream a reactive streams {@link Publisher} + */ + public ByteStream(Publisher stream) { + this.stream = requireNonNull(stream); + } + + /** + * Transform the stream by performing a mapping operation on each {@link Buffer} object. + * + * The mapping operation automatically maintains the @{link Buffer} reference counts as + * follows: + * + *
    + *
  • When the mapping function returns a new {@link Buffer} instance, the reference count for + * the old one is automatically decremented.
  • + *
  • When the mapping function modifies the {@link Buffer} in place, returning the same instance + * back, the reference count is unchanged.
  • + *
+ * + * @param mapping a mapping function + * + * @return a new, mapped {@code ByteStream} object + */ + public ByteStream map(Function mapping) { + return new ByteStream(Flux.from(stream).map(releaseOldBuffers(mapping))); + } + + private static Function releaseOldBuffers(Function mapping) { + return buffer -> { + Buffer buffer2 = mapping.apply(buffer); + if (buffer != buffer2) { + buffer.delegate().release(); + } + return buffer2; + }; + } + + /** + * Transform the stream by dropping all {@link Buffer} objects. + * + * The {@code drop} returns a new {@code ByteStream} object with all upstream + * buffers removed. The {@code drop} automatically decrements the reference + * counts for each dropped {@link Buffer}. + * + * @return an empty {@link ByteStream} + */ + public ByteStream drop() { + return new ByteStream(Flux.from(stream) + .doOnNext(buffer -> buffer.delegate().release()) + .filter(buffer -> false)); + } + + /** + * Run a provided action at the end of the byte stream. + * + * The provided action must accept an {@code Optional} argument, + * which is be set to {@code Optional.empty} if this stream finished successfully, + * or an {@code Optional.of(cause)} when this stream terminated with an error. + * + * @param action an action function + * + * @return an unmodified {@code ByteStream} with an action function attached + */ + public ByteStream doOnEnd(Consumer> action) { + return new ByteStream(Flux.from(this.stream) + .doOnError(cause -> action.accept(Optional.of(cause))) + .doOnComplete(() -> action.accept(Optional.empty())) + ); + } + + /** + * Consumes the stream by collecting it into an aggregate {@link Buffer} object. + * + * The aggregate {@link Buffer} object must be released after use. + * + * @param maxContentBytes maximum size for the aggregated buffer + * @return a future of aggregated buffer + */ + public CompletableFuture aggregate(int maxContentBytes) { + return new ByteStreamAggregator(this.stream, maxContentBytes) + .apply(); + } + + /** + * Consume the {@link ByteStream} by providing a reactive streams {@link Subscriber}. + * + * @param subscriber a reactive streams {@link Subscriber} + */ + @Override + public void subscribe(Subscriber subscriber) { + stream.subscribe(subscriber); + } +} diff --git a/components/api/src/main/java/com/hotels/styx/api/ByteStreamAggregator.java b/components/api/src/main/java/com/hotels/styx/api/ByteStreamAggregator.java new file mode 100644 index 0000000000..f713afdb96 --- /dev/null +++ b/components/api/src/main/java/com/hotels/styx/api/ByteStreamAggregator.java @@ -0,0 +1,91 @@ +/* + Copyright (C) 2013-2018 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.hotels.styx.api; + +import io.netty.buffer.CompositeByteBuf; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static io.netty.buffer.Unpooled.compositeBuffer; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +class ByteStreamAggregator implements Subscriber { + private final Publisher upstream; + private final int maxSize; + private final CompletableFuture future = new CompletableFuture<>(); + private final AtomicBoolean active = new AtomicBoolean(); + private final CompositeByteBuf aggregated = compositeBuffer(); + private Subscription subscription; + + ByteStreamAggregator(Publisher upstream, int maxSize) { + this.upstream = requireNonNull(upstream); + this.maxSize = maxSize; + } + + public CompletableFuture apply() { + if (active.compareAndSet(false, true)) { + this.upstream.subscribe(this); + return future; + } else { + throw new IllegalStateException("ByteStreamAggregator may only be started once."); + } + } + + @Override + public void onSubscribe(Subscription subscription) { + if (this.subscription == null) { + this.subscription = subscription; + this.subscription.request(Long.MAX_VALUE); + } else { + subscription.cancel(); + throw new IllegalStateException("ByteStreamAggregator supports only one Producer instance."); + } + } + + @Override + public void onNext(Buffer part) { + long newSize = aggregated.readableBytes() + part.size(); + + if (newSize > maxSize) { + part.delegate().release(); + aggregated.release(); + subscription.cancel(); + this.future.completeExceptionally( + new ContentOverflowException(format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxSize))); + } else { + aggregated.addComponent(part.delegate()); + aggregated.writerIndex(aggregated.writerIndex() + part.size()); + } + } + + @Override + public void onError(Throwable cause) { + aggregated.release(); + subscription.cancel(); + future.completeExceptionally(cause); + } + + @Override + public void onComplete() { + future.complete(new Buffer(aggregated)); + } + +} diff --git a/components/api/src/main/java/com/hotels/styx/api/FlowControlDisableOperator.java b/components/api/src/main/java/com/hotels/styx/api/FlowControlDisableOperator.java deleted file mode 100644 index 03ef3684dd..0000000000 --- a/components/api/src/main/java/com/hotels/styx/api/FlowControlDisableOperator.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - Copyright (C) 2013-2018 Expedia Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.hotels.styx.api; - -import io.netty.util.ReferenceCountUtil; -import rx.Observable; -import rx.Subscriber; - -/** - * Requests as many event from upstream as possible (i.e. disables backpressure). - * - * @param event type - */ -final class FlowControlDisableOperator implements Observable.Operator { - public static FlowControlDisableOperator disableFlowControl() { - return new FlowControlDisableOperator(); - } - - private FlowControlDisableOperator() { - } - - @Override - public Subscriber call(Subscriber downstream) { - return new Subscriber() { - @Override - public void onStart() { - request(1); - } - - @Override - public void onCompleted() { - downstream.onCompleted(); - } - - @Override - public void onError(Throwable e) { - downstream.onError(e); - } - - @Override - public void onNext(E e) { - if (downstream.isUnsubscribed()) { - ReferenceCountUtil.release(e); - } else { - downstream.onNext(e); - } - - request(1); - } - }; - } -} diff --git a/components/api/src/main/java/com/hotels/styx/api/FullHttpRequest.java b/components/api/src/main/java/com/hotels/styx/api/FullHttpRequest.java index e99004d429..d17367f789 100644 --- a/components/api/src/main/java/com/hotels/styx/api/FullHttpRequest.java +++ b/components/api/src/main/java/com/hotels/styx/api/FullHttpRequest.java @@ -16,8 +16,7 @@ package com.hotels.styx.api; import com.google.common.collect.ImmutableSet; -import io.netty.buffer.Unpooled; -import rx.Observable; +import reactor.core.publisher.Flux; import java.nio.charset.Charset; import java.util.Collection; @@ -45,6 +44,7 @@ import static com.hotels.styx.api.HttpVersion.HTTP_1_1; import static com.hotels.styx.api.RequestCookie.decode; import static com.hotels.styx.api.RequestCookie.encode; +import static io.netty.buffer.Unpooled.copiedBuffer; import static java.lang.Integer.parseInt; import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; @@ -315,9 +315,9 @@ public HttpRequest toStreamingRequest() { .disableValidation(); if (this.body.length == 0) { - return streamingBuilder.body(new StyxCoreObservable<>(Observable.empty())).build(); + return streamingBuilder.body(new ByteStream(Flux.empty())).build(); } else { - return streamingBuilder.body(StyxObservable.of(Unpooled.copiedBuffer(body))).build(); + return streamingBuilder.body(new ByteStream(Flux.just(new Buffer(copiedBuffer(body))))).build(); } } diff --git a/components/api/src/main/java/com/hotels/styx/api/FullHttpResponse.java b/components/api/src/main/java/com/hotels/styx/api/FullHttpResponse.java index 8b68842324..14d5902a74 100644 --- a/components/api/src/main/java/com/hotels/styx/api/FullHttpResponse.java +++ b/components/api/src/main/java/com/hotels/styx/api/FullHttpResponse.java @@ -17,7 +17,7 @@ import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; -import rx.Observable; +import reactor.core.publisher.Flux; import java.nio.charset.Charset; import java.util.Collection; @@ -32,10 +32,10 @@ import static com.hotels.styx.api.HttpHeaderNames.SET_COOKIE; import static com.hotels.styx.api.HttpHeaderNames.TRANSFER_ENCODING; import static com.hotels.styx.api.HttpHeaderValues.CHUNKED; -import static com.hotels.styx.api.ResponseCookie.decode; -import static com.hotels.styx.api.ResponseCookie.encode; import static com.hotels.styx.api.HttpResponseStatus.OK; import static com.hotels.styx.api.HttpVersion.HTTP_1_1; +import static com.hotels.styx.api.ResponseCookie.decode; +import static com.hotels.styx.api.ResponseCookie.encode; import static io.netty.buffer.Unpooled.copiedBuffer; import static java.lang.Integer.parseInt; import static java.util.Arrays.asList; @@ -70,7 +70,7 @@ public class FullHttpResponse implements FullHttpMessage { private final HttpHeaders headers; private final byte[] body; - FullHttpResponse(Builder builder) { + private FullHttpResponse(Builder builder) { this.version = builder.version; this.status = builder.status; this.headers = builder.headers.build(); @@ -182,9 +182,9 @@ public boolean isRedirect() { */ public HttpResponse toStreamingResponse() { if (this.body.length == 0) { - return new HttpResponse.Builder(this, new StyxCoreObservable<>(Observable.empty())).build(); + return new HttpResponse.Builder(this, new ByteStream(Flux.empty())).build(); } else { - return new HttpResponse.Builder(this, StyxObservable.of(copiedBuffer(this.body))).build(); + return new HttpResponse.Builder(this, new ByteStream(Flux.just(new Buffer(copiedBuffer(this.body))))).build(); } } diff --git a/components/api/src/main/java/com/hotels/styx/api/HttpRequest.java b/components/api/src/main/java/com/hotels/styx/api/HttpRequest.java index 00d3a047c5..31100b8c37 100644 --- a/components/api/src/main/java/com/hotels/styx/api/HttpRequest.java +++ b/components/api/src/main/java/com/hotels/styx/api/HttpRequest.java @@ -16,9 +16,7 @@ package com.hotels.styx.api; import com.google.common.collect.ImmutableSet; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import rx.Observable; +import reactor.core.publisher.Flux; import java.util.Collection; import java.util.Collections; @@ -26,11 +24,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import static com.google.common.base.Objects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; -import static com.hotels.styx.api.FlowControlDisableOperator.disableFlowControl; import static com.hotels.styx.api.HttpHeaderNames.CONNECTION; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH; import static com.hotels.styx.api.HttpHeaderNames.COOKIE; @@ -49,11 +47,8 @@ import static com.hotels.styx.api.RequestCookie.decode; import static com.hotels.styx.api.RequestCookie.encode; import static io.netty.buffer.ByteBufUtil.getBytes; -import static io.netty.buffer.Unpooled.compositeBuffer; import static io.netty.buffer.Unpooled.copiedBuffer; -import static io.netty.util.ReferenceCountUtil.release; import static java.lang.Integer.parseInt; -import static java.lang.String.format; import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -70,7 +65,7 @@ *

* An {@code HttpRequest} object is immutable with respect to the request line * attributes and HTTP headers. Once an instance is created, they cannot change. - * + *

* An {@code HttpRequest} body is a byte buffer stream that can be consumed * as sequence of asynchronous events. Once consumed, the stream is exhausted and * can not be reused. Conceptually each {@code HttpRequest} object @@ -78,21 +73,21 @@ * For example, a Styx Server implements a content producer for {@link HttpInterceptor} * extensions. The producer receives data chunks from a network socket and publishes * them to an appropriate content stream. - * + *

* HTTP requests are created via {@code Builder} object, which can be created * with static helper methods: * *

    - *
  • {@code get}
  • - *
  • {@code head}
  • - *
  • {@code post}
  • - *
  • {@code put}
  • - *
  • {@code delete}
  • - *
  • {@code patch}
  • + *
  • {@code get}
  • + *
  • {@code head}
  • + *
  • {@code post}
  • + *
  • {@code put}
  • + *
  • {@code delete}
  • + *
  • {@code patch}
  • *
- * + *

* A builder can also be created with one of the {@code Builder} constructors. - * + *

* A special method {@code newBuilder} creates a prepopulated {@code Builder} * from the current request object. It is useful for transforming a request * to another one my modifying one or more of its attributes. @@ -103,7 +98,7 @@ public class HttpRequest implements StreamingHttpMessage { private final HttpMethod method; private final Url url; private final HttpHeaders headers; - private final StyxObservable body; + private final ByteStream body; HttpRequest(Builder builder) { this.id = builder.id == null ? randomUUID() : builder.id; @@ -179,10 +174,9 @@ public static Builder patch(String uri) { * * @param uri URI * @param body body - * @param body type * @return {@code this} */ - public static Builder post(String uri, StyxObservable body) { + public static Builder post(String uri, ByteStream body) { return new Builder(POST, uri).body(body); } @@ -191,10 +185,9 @@ public static Builder post(String uri, StyxObservable body) { * * @param uri URI * @param body body - * @param body type * @return {@code this} */ - public static Builder put(String uri, StyxObservable body) { + public static Builder put(String uri, ByteStream body) { return new Builder(PUT, uri).body(body); } @@ -203,10 +196,9 @@ public static Builder put(String uri, StyxObservable body) { * * @param uri URI * @param body body - * @param body type * @return {@code this} */ - public static Builder patch(String uri, StyxObservable body) { + public static Builder patch(String uri, ByteStream body) { return new Builder(PATCH, uri).body(body); } @@ -239,7 +231,7 @@ public List headers(CharSequence name) { * @return request body as a byte stream */ @Override - public StyxObservable body() { + public ByteStream body() { return body; } @@ -339,11 +331,11 @@ public Builder newBuilder() { * Returns a {@link StyxObservable} that eventually produces a * {@link FullHttpRequest}. The resulting full request object has the same * request line, headers, and content as this request. - * + *

* The content stream is aggregated asynchronously. The stream may be connected * to a network socket or some other content producer. Once aggregated, a * FullHttpRequest object is emitted on the returned {@link StyxObservable}. - * + *

* A sole {@code maxContentBytes} argument is a backstop defence against excessively * long content streams. The {@code maxContentBytes} should be set to a sensible * value according to your application requirements and heap size. When the content @@ -354,34 +346,17 @@ public Builder newBuilder() { * @return a {@link StyxObservable} */ public StyxObservable toFullRequest(int maxContentBytes) { - CompositeByteBuf byteBufs = compositeBuffer(); - - return new StyxCoreObservable<>( - ((StyxCoreObservable) body) - .delegate() - .lift(disableFlowControl()) - .doOnError(e -> byteBufs.release()) - .collect(() -> byteBufs, (composite, part) -> { - long newSize = composite.readableBytes() + part.readableBytes(); - - if (newSize > maxContentBytes) { - release(composite); - release(part); - - throw new ContentOverflowException(format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxContentBytes)); - } - composite.addComponent(part); - composite.writerIndex(composite.writerIndex() + part.readableBytes()); - }) - .map(HttpRequest::decodeAndRelease) - .map(decoded -> new FullHttpRequest.Builder(this, decoded).build())); + return StyxObservable.from( + body.aggregate(maxContentBytes) + .thenApply(it -> new FullHttpRequest.Builder(this, decodeAndRelease(it)).build()) + ); } - private static byte[] decodeAndRelease(CompositeByteBuf aggregate) { + private static byte[] decodeAndRelease(Buffer aggregate) { try { - return getBytes(aggregate); + return getBytes(aggregate.delegate()); } finally { - aggregate.release(); + aggregate.delegate().release(); } } @@ -429,7 +404,7 @@ public static final class Builder { private Url url; private HttpHeaders.Builder headers; private HttpVersion version = HTTP_1_1; - private StyxObservable body; + private ByteStream body; /** * Creates a new {@link Builder} object with default attributes. @@ -437,14 +412,14 @@ public static final class Builder { public Builder() { this.url = Url.Builder.url("/").build(); this.headers = new HttpHeaders.Builder(); - this.body = new StyxCoreObservable<>(Observable.empty()); + this.body = new ByteStream(Flux.empty()); } /** * Creates a new {@link Builder} with specified HTTP method and URI. * * @param method a HTTP method - * @param uri a HTTP URI + * @param uri a HTTP URI */ public Builder(HttpMethod method, String uri) { this(); @@ -455,10 +430,10 @@ public Builder(HttpMethod method, String uri) { /** * Creates a new {@link Builder} from an existing request with a new body content stream. * - * @param request a HTTP request object + * @param request a HTTP request object * @param contentStream a body content stream */ - public Builder(HttpRequest request, StyxObservable contentStream) { + public Builder(HttpRequest request, ByteStream contentStream) { this.id = request.id(); this.method = httpMethod(request.method().name()); this.url = request.url(); @@ -482,7 +457,7 @@ public Builder(HttpRequest request, StyxObservable contentStream) { this.url = request.url(); this.version = request.version(); this.headers = request.headers().newBuilder(); - this.body = StyxCoreObservable.of(copiedBuffer(request.body())); + this.body = new ByteStream(Flux.just(new Buffer(copiedBuffer(request.body())))); } /** @@ -501,11 +476,22 @@ public Builder uri(String uri) { * @param content request body * @return {@code this} */ - public Builder body(StyxObservable content) { + public Builder body(ByteStream content) { this.body = content; return this; } + /** + * Transforms request body. + * + * @param transformation a Function from ByteStream to ByteStream. + * @return a HttpRequest builder with a transformed message body. + */ + public Builder body(Function transformation) { + this.body = transformation.apply(this.body); + return this; + } + /** * Sets the unique ID for this request. * diff --git a/components/api/src/main/java/com/hotels/styx/api/HttpResponse.java b/components/api/src/main/java/com/hotels/styx/api/HttpResponse.java index b1e2f65c33..94f4496563 100644 --- a/components/api/src/main/java/com/hotels/styx/api/HttpResponse.java +++ b/components/api/src/main/java/com/hotels/styx/api/HttpResponse.java @@ -17,21 +17,17 @@ import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.util.ReferenceCountUtil; -import rx.Observable; +import reactor.core.publisher.Flux; -import java.nio.charset.Charset; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import static com.google.common.base.Objects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; -import static com.hotels.styx.api.FlowControlDisableOperator.disableFlowControl; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH; import static com.hotels.styx.api.HttpHeaderNames.SET_COOKIE; import static com.hotels.styx.api.HttpHeaderNames.TRANSFER_ENCODING; @@ -43,11 +39,7 @@ import static com.hotels.styx.api.ResponseCookie.decode; import static com.hotels.styx.api.ResponseCookie.encode; import static io.netty.buffer.ByteBufUtil.getBytes; -import static io.netty.buffer.Unpooled.compositeBuffer; -import static io.netty.buffer.Unpooled.copiedBuffer; -import static io.netty.util.ReferenceCountUtil.release; import static java.lang.Integer.parseInt; -import static java.lang.String.format; import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -62,7 +54,7 @@ *

* An {@code HttpResponse} object is immutable with respect to the response * attributes and HTTP headers. Once an instance is created, they cannot change. - * + *

* An {@code HttpResponse} body is a byte buffer stream that can be consumed * as sequence of asynchronous events. Once consumed, the stream is exhausted and * can not be reused. Conceptually each {@code HttpResponse} object has an @@ -70,16 +62,16 @@ * a Styx Server implements a content producer for {@link HttpInterceptor} * extensions. The producer receives data chunks from a network socket and * publishes them to an appropriate content stream. - * + *

* HTTP responses are created via {@code Builder} object, which can be created * with static helper methods: * *

    - *
  • {@code response()}
  • - *
  • {@code response(HttpResponseStatus)}
  • - *
  • {@code response(HttpResponseStatus, StyxObservable)}
  • + *
  • {@code response()}
  • + *
  • {@code response(HttpResponseStatus)}
  • + *
  • {@code response(HttpResponseStatus, StyxObservable)}
  • *
- * + *

* A builder can also be created with one of the {@code Builder} constructors. * * A special method {@code newBuilder} creates a prepopulated {@code Builder} @@ -90,7 +82,7 @@ public class HttpResponse implements StreamingHttpMessage { private final HttpVersion version; private final HttpResponseStatus status; private final HttpHeaders headers; - private final StyxObservable body; + private final ByteStream body; HttpResponse(Builder builder) { this.version = builder.version; @@ -125,7 +117,7 @@ public static Builder response(HttpResponseStatus status) { * @param body response body * @return a new builder */ - public static Builder response(HttpResponseStatus status, StyxObservable body) { + public static Builder response(HttpResponseStatus status, ByteStream body) { return new Builder(status).body(body); } @@ -141,7 +133,7 @@ public HttpHeaders headers() { * @return the response body as a byte stream */ @Override - public StyxObservable body() { + public ByteStream body() { return body; } @@ -185,11 +177,11 @@ public boolean isRedirect() { * Returns a {@link StyxObservable} that eventually produces a * {@link FullHttpResponse}. The resulting full response object has the same * response line, headers, and content as this response. - * + *

* The content stream is aggregated asynchronously. The stream may be connected * to a network socket or some other content producer. Once aggregated, a * FullHttpResponse object is emitted on the returned {@link StyxObservable}. - * + *

* A sole {@code maxContentBytes} argument is a backstop defence against excessively * long content streams. The {@code maxContentBytes} should be set to a sensible * value according to your application requirements and heap size. When the content @@ -200,40 +192,22 @@ public boolean isRedirect() { * @return a {@link StyxObservable} */ public StyxObservable toFullResponse(int maxContentBytes) { - CompositeByteBuf byteBufs = compositeBuffer(); - - Observable delegate = ((StyxCoreObservable) body) - .delegate() - .lift(disableFlowControl()) - .doOnError(e -> byteBufs.release()) - .collect(() -> byteBufs, (composite, part) -> { - long newSize = composite.readableBytes() + part.readableBytes(); - - if (newSize > maxContentBytes) { - release(composite); - release(part); - - throw new ContentOverflowException(format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxContentBytes)); - } - composite.addComponent(part); - composite.writerIndex(composite.writerIndex() + part.readableBytes()); - }) - .map(HttpResponse::decodeAndRelease) - .map(decoded -> new FullHttpResponse.Builder(this, decoded) - .disableValidation() - .build()); - - return new StyxCoreObservable<>(delegate); + return StyxObservable.from(body.aggregate(maxContentBytes)) + .map(it -> new FullHttpResponse.Builder(this, decodeAndRelease(it)) + .disableValidation() + .build() + ); } - private static byte[] decodeAndRelease(CompositeByteBuf aggregate) { + private static byte[] decodeAndRelease(Buffer aggregate) { try { - return getBytes(aggregate); + return getBytes(aggregate.delegate()); } finally { - aggregate.release(); + aggregate.delegate().release(); } } + /** * Decodes "Set-Cookie" header values and returns them as set of {@link ResponseCookie} objects. * @@ -248,7 +222,7 @@ public Set cookies() { * * @param name cookie name * @return an optional {@link ResponseCookie} value if corresponding cookie name is present, - * or {@link Optional#empty} if not. + * or {@link Optional#empty} if not. */ public Optional cookie(String name) { return cookies().stream() @@ -292,14 +266,14 @@ public static final class Builder { private HttpHeaders.Builder headers; private HttpVersion version = HTTP_1_1; private boolean validate = true; - private StyxObservable body; + private ByteStream body; /** * Creates a new {@link Builder} object with default attributes. */ public Builder() { this.headers = new HttpHeaders.Builder(); - this.body = new StyxCoreObservable<>(Observable.empty()); + this.body = new ByteStream(Flux.empty()); } /** @@ -329,16 +303,16 @@ public Builder(HttpResponse response) { * Creates a new {@link Builder} object from a response code and a content stream. *

* Builder's response status line parameters and the HTTP headers are populated from - * the given {@code response} object, but the content stream is set to {@code contentStream}. + * the given {@code response} object, but the content stream is set to {@code ByteStream}. * - * @param response a full response for which the builder is based on - * @param contentStream a content byte stream + * @param response a full response for which the builder is based on + * @param byteStream a content byte stream */ - public Builder(FullHttpResponse response, StyxObservable contentStream) { + public Builder(FullHttpResponse response, ByteStream byteStream) { this.status = statusWithCode(response.status().code()); this.version = httpVersion(response.version().toString()); this.headers = response.headers().newBuilder(); - this.body = contentStream; + this.body = requireNonNull(byteStream); } /** @@ -358,20 +332,20 @@ public Builder status(HttpResponseStatus status) { * @param content response body * @return {@code this} */ - public Builder body(StyxObservable content) { - this.body = content; + public Builder body(ByteStream content) { + this.body = requireNonNull(content); return this; } /** - * Sets the message body by encoding a {@link StyxObservable} of {@link String}s into bytes. + * Transforms request body. * - * @param contentObservable message body content. - * @param charset character set - * @return {@code this} + * @param transformation a Function from ByteStream to ByteStream. + * @return a HttpResponhse builder with a transformed message body. */ - public Builder body(StyxObservable contentObservable, Charset charset) { - return body(contentObservable.map(content -> copiedBuffer(content, charset))); + public Builder body(Function transformation) { + this.body = requireNonNull(transformation.apply(this.body)); + return this; } /** @@ -554,12 +528,8 @@ public Builder removeHeader(CharSequence name) { */ // TODO: See https://github.com/HotelsDotCom/styx/issues/201 public Builder removeBody() { - Observable delegate = ((StyxCoreObservable) body) - .delegate() - .doOnNext(ReferenceCountUtil::release) - .ignoreElements(); - - return body(new StyxCoreObservable<>(delegate)); + this.body = body.drop(); + return this; } @@ -581,8 +551,8 @@ public Builder headers(HttpHeaders headers) { * method is invoked. Specifically that: * *

  • - *
      There is maximum of only one {@code Content-Length} header
    - *
      The {@code Content-Length} header is zero or positive integer
    + *
      There is maximum of only one {@code Content-Length} header
    + *
      The {@code Content-Length} header is zero or positive integer
    *
  • * * @return {@code this} @@ -597,16 +567,16 @@ public Builder disableValidation() { *

    * Validates and builds a {link HttpResponse} object. Object validation can be * disabled with {@link this.disableValidation} method. - * + *

    * When validation is enabled (by default), ensures that: * *

  • - *
      There is maximum of only one {@code Content-Length} header
    - *
      The {@code Content-Length} header is zero or positive integer
    + *
      There is maximum of only one {@code Content-Length} header
    + *
      The {@code Content-Length} header is zero or positive integer
    *
  • * - * @throws IllegalArgumentException when validation fails * @return a new full response. + * @throws IllegalArgumentException when validation fails */ public HttpResponse build() { if (validate) { diff --git a/components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java b/components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java index 8e838eb82a..2b0947cedb 100644 --- a/components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java +++ b/components/api/src/main/java/com/hotels/styx/api/StreamingHttpMessage.java @@ -15,12 +15,8 @@ */ package com.hotels.styx.api; -import io.netty.buffer.ByteBuf; -import rx.Subscriber; - import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_TYPE; @@ -48,7 +44,7 @@ interface StreamingHttpMessage { * * @return the body */ - StyxObservable body(); + ByteStream body(); /** * Returns the value of the header with the specified {@code name}. @@ -97,27 +93,4 @@ default boolean chunked() { return HttpMessageSupport.chunked(headers()); } - default CompletableFuture releaseContentBuffers() { - CompletableFuture future = new CompletableFuture<>(); - - ((StyxCoreObservable) body()).delegate() - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - future.complete(true); - } - - @Override - public void onError(Throwable e) { - future.completeExceptionally(e); - } - - @Override - public void onNext(ByteBuf byteBuf) { - byteBuf.release(); - } - }); - - return future; - } } diff --git a/components/api/src/test/java/com/hotels/styx/api/ByteStreamAggregatorTest.java b/components/api/src/test/java/com/hotels/styx/api/ByteStreamAggregatorTest.java new file mode 100644 index 0000000000..17186c6caa --- /dev/null +++ b/components/api/src/test/java/com/hotels/styx/api/ByteStreamAggregatorTest.java @@ -0,0 +1,157 @@ +/* + Copyright (C) 2013-2018 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.hotels.styx.api; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; +import reactor.test.publisher.TestPublisher; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertTrue; + +public class ByteStreamAggregatorTest { + + @Test(expectedExceptions = IllegalStateException.class) + public void allowsOnlyOneAggregation() { + Publisher upstream = Flux.just(new Buffer("x", UTF_8)); + ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 100); + + aggregator.apply(); + aggregator.apply(); + } + + @Test + public void aggregatesZeroBuffers() throws ExecutionException, InterruptedException { + ByteStreamAggregator aggregator = new ByteStreamAggregator(Flux.empty(), 100); + + Buffer a = aggregator.apply().get(); + assertThat(a.size(), is(0)); + assertThat(new String(a.content(), UTF_8), is("")); + } + + @Test + public void aggregatesOneBuffer() throws ExecutionException, InterruptedException { + ByteStreamAggregator aggregator = new ByteStreamAggregator(Flux.just(new Buffer("x", UTF_8)), 100); + + Buffer a = aggregator.apply().get(); + assertThat(a.size(), is(1)); + assertThat(new String(a.content(), UTF_8), is("x")); + } + + @Test + public void aggregatesManyBuffers() throws ExecutionException, InterruptedException { + ByteStreamAggregator aggregator = new ByteStreamAggregator(Flux.just( + new Buffer("x", UTF_8), + new Buffer("y", UTF_8)), 100); + + Buffer a = aggregator.apply().get(); + assertThat(a.size(), is(2)); + assertThat(new String(a.content(), UTF_8), is("xy")); + } + + @Test + public void aggregatesUpToNBytes() { + AtomicReference causeCapture = new AtomicReference<>(null); + + Buffer a = new Buffer("aaabbb", UTF_8); + Buffer b = new Buffer("ccc", UTF_8); + + TestPublisher upstream = TestPublisher.create(); + + ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 8); + + CompletableFuture future = aggregator.apply() + .exceptionally(cause -> { + causeCapture.set(cause); + throw new RuntimeException(); + }); + + upstream.next(a); + upstream.next(b); + + upstream.assertCancelled(); + + assertTrue(future.isCompletedExceptionally()); + assertThat(causeCapture.get(), instanceOf(ContentOverflowException.class)); + + assertThat(a.delegate().refCnt(), is(0)); + assertThat(b.delegate().refCnt(), is(0)); + } + + @Test(expectedExceptions = NullPointerException.class) + public void checkForNullSubscription() { + Publisher upstream = mock(Publisher.class); + ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 100); + + aggregator.onSubscribe(null); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void allowsOnlyOneSubscription() { + Publisher upstream = mock(Publisher.class); + Subscription subscription1 = mock(Subscription.class); + Subscription subscription2 = mock(Subscription.class); + + ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 100); + aggregator.onSubscribe(subscription1); + + try { + aggregator.onSubscribe(subscription2); + } catch (IllegalStateException cause) { + verify(subscription2).cancel(); + throw cause; + } + } + + @Test + public void emitsErrors() { + AtomicReference causeCapture = new AtomicReference<>(null); + + Buffer a = new Buffer("aaabbb", UTF_8); + + TestPublisher upstream = TestPublisher.create(); + + ByteStreamAggregator aggregator = new ByteStreamAggregator(upstream, 8); + + CompletableFuture future = aggregator.apply() + .exceptionally(cause -> { + causeCapture.set(cause); + throw new RuntimeException(); + }); + + upstream.next(a); + upstream.error(new RuntimeException("something broke")); + + upstream.assertCancelled(); + + assertTrue(future.isCompletedExceptionally()); + assertThat(causeCapture.get(), instanceOf(RuntimeException.class)); + assertThat(causeCapture.get().getMessage(), is("something broke")); + + assertThat(a.delegate().refCnt(), is(0)); + } +} \ No newline at end of file diff --git a/components/api/src/test/java/com/hotels/styx/api/ByteStreamTest.java b/components/api/src/test/java/com/hotels/styx/api/ByteStreamTest.java new file mode 100644 index 0000000000..e285cf6783 --- /dev/null +++ b/components/api/src/test/java/com/hotels/styx/api/ByteStreamTest.java @@ -0,0 +1,190 @@ +/* + Copyright (C) 2013-2018 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.hotels.styx.api; + +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + + +public class ByteStreamTest { + private Buffer buf1; + private Buffer buf2; + private Buffer buf3; + + @BeforeMethod + public void setUp() { + buf1 = new Buffer("a", UTF_8); + buf2 = new Buffer("b", UTF_8); + buf3 = new Buffer("c", UTF_8); + } + + @Test + public void publishesContent() { + ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3)); + + StepVerifier.create(stream) + .expectNext(buf1) + .expectNext(buf2) + .expectNext(buf3) + .verifyComplete(); + } + + @Test + public void supportsBackpressure() { + ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3)); + + StepVerifier.create(stream, 0) + .expectSubscription() + .thenRequest(1) + .expectNext(buf1) + .thenRequest(2) + .expectNext(buf2, buf3) + .verifyComplete(); + } + + @Test + public void mapsContent() { + ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3)); + + ByteStream mapped = stream.map(this::toUpperCase); + + StepVerifier.create(Flux.from(mapped).map(this::decodeUtf8String)) + .expectSubscription() + .expectNext("A", "B", "C") + .verifyComplete(); + } + + @Test + public void releasesRefcountForMappedBuffers() { + ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3)); + + ByteStream mapped = stream.map(this::toUpperCase); + + StepVerifier.create(Flux.from(mapped).map(this::decodeUtf8String)) + .expectSubscription() + .expectNext("A", "B", "C") + .verifyComplete(); + + assertThat(buf1.delegate().refCnt(), is(0)); + assertThat(buf2.delegate().refCnt(), is(0)); + assertThat(buf3.delegate().refCnt(), is(0)); + } + + @Test + public void mapRetainsRefcountsForInlineBufferChanges() { + ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3)); + + ByteStream mapped = stream.map(buf -> buf); + + StepVerifier.create(Flux.from(mapped).map(this::decodeUtf8String)) + .expectSubscription() + .expectNextCount(3) + .verifyComplete(); + + assertThat(buf1.delegate().refCnt(), is(1)); + assertThat(buf2.delegate().refCnt(), is(1)); + assertThat(buf3.delegate().refCnt(), is(1)); + } + + @Test + public void discardsContent() { + ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3)); + + ByteStream discarded = stream.drop(); + + StepVerifier.create(discarded) + .expectSubscription() + .verifyComplete(); + + assertThat(buf1.delegate().refCnt(), is(0)); + assertThat(buf2.delegate().refCnt(), is(0)); + assertThat(buf3.delegate().refCnt(), is(0)); + } + + @Test + public void aggregatesContent() throws ExecutionException, InterruptedException { + ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3)); + + Buffer aggregated = stream.aggregate(100).get(); + assertThat(decodeUtf8String(aggregated), is("abc")); + } + + @Test + public void contentAggregationOverflow() throws ExecutionException, InterruptedException { + ByteStream stream = new ByteStream(Flux.just(buf1, buf2, buf3)); + + Throwable cause = stream.aggregate(2) + .handle((result, throwable) -> throwable) + .get(); + + assertThat(cause, instanceOf(ContentOverflowException.class)); + } + + + @Test + public void deliversAtEndOfStreamNotification() { + AtomicBoolean terminated = new AtomicBoolean(); + ByteStream stream = new ByteStream(Flux.just(buf1, buf2)) + .doOnEnd(maybeCause -> terminated.set(maybeCause == Optional.empty())); + + StepVerifier.create(new ByteStream(stream), 0) + .thenRequest(1) + .expectNext(buf1) + .then(() -> assertThat(terminated.get(), is(false))) + .thenRequest(1) + .expectNext(buf2) + .then(() -> assertThat(terminated.get(), is(true))) + .expectComplete() + .verify(); + } + + @Test + public void deliversAtEndOfStreamNotificationWhenTerminated() { + AtomicBoolean terminated = new AtomicBoolean(); + ByteStream stream = new ByteStream(Flux.just(buf1, buf2).concatWith(Flux.error(new RuntimeException("bang!")))) + .doOnEnd(maybeCause -> terminated.set(maybeCause.isPresent())); + + StepVerifier.create(new ByteStream(stream), 0) + .thenRequest(1) + .expectNext(buf1) + .then(() -> assertThat(terminated.get(), is(false))) + .thenRequest(1) + .expectNext(buf2) + .then(() -> assertThat(terminated.get(), is(true))) + .expectError() + .verify(); + } + + private String decodeUtf8String(Buffer buffer) { + return new String(buffer.content(), UTF_8); + } + + private Buffer toUpperCase(Buffer buffer) { + return new Buffer(decodeUtf8String(buffer).toUpperCase(), UTF_8); + } + +} diff --git a/components/api/src/test/java/com/hotels/styx/api/FlowControlDisableOperatorTest.java b/components/api/src/test/java/com/hotels/styx/api/FlowControlDisableOperatorTest.java deleted file mode 100644 index aae857e33b..0000000000 --- a/components/api/src/test/java/com/hotels/styx/api/FlowControlDisableOperatorTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - Copyright (C) 2013-2018 Expedia Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.hotels.styx.api; - -import io.netty.buffer.ByteBuf; -import org.testng.annotations.Test; -import rx.observers.TestSubscriber; -import rx.subjects.PublishSubject; - -import static com.hotels.styx.api.FlowControlDisableOperator.disableFlowControl; -import static io.netty.buffer.Unpooled.copiedBuffer; -import static java.nio.charset.StandardCharsets.UTF_8; -import static rx.Observable.just; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -public class FlowControlDisableOperatorTest { - /* - This test proves that when the subscriber requests back-pressure, but the FlowControlDisableOperator is applied, - the events are delivered as if no back-pressure was requested. - */ - @Test - public void liftedObservableDisablesBackpressure() throws InterruptedException { - TestSubscriber subscriber = new TestSubscriber<>(1); - - just("one", "two", "three") - .lift(disableFlowControl()) - .subscribe(subscriber); - - subscriber.assertValues("one", "two", "three"); - } - - /* - This test does not actually test the FlowControlDisableOperator, rather it is used to demonstrate the difference - that the presence of the operator makes to the delivery of events. - */ - @Test - public void nonLiftedObservableCanBeSubscribedWithBackpressure() { - TestSubscriber subscriber = new TestSubscriber<>(1); - - just("one", "two", "three") - .subscribe(subscriber); - - subscriber.assertValues("one"); - subscriber.requestMore(1); - subscriber.assertValues("one", "two"); - subscriber.requestMore(1); - subscriber.assertValues("one", "two", "three"); - } - - @Test - public void releasesReferenceCountsAfterDownstreamUnsubscribes() throws Exception { - TestSubscriber subscriber = new TestSubscriber<>(); - PublishSubject source = PublishSubject.create(); - - ByteBuf buf1 = copiedBuffer("abc", UTF_8); - ByteBuf buf2 = copiedBuffer("abc", UTF_8); - ByteBuf buf3 = copiedBuffer("abc", UTF_8); - - source.lift(disableFlowControl()) - .subscribe(subscriber); - - source.onNext(buf1); - assertThat(buf1.refCnt(), is(1)); - - subscriber.unsubscribe(); - - source.onNext(buf2); - assertThat(buf2.refCnt(), is(0)); - - source.onNext(buf3); - assertThat(buf3.refCnt(), is(0)); - } -} \ No newline at end of file diff --git a/components/api/src/test/java/com/hotels/styx/api/FullHttpRequestTest.java b/components/api/src/test/java/com/hotels/styx/api/FullHttpRequestTest.java index 1da4205415..2b072354f8 100644 --- a/components/api/src/test/java/com/hotels/styx/api/FullHttpRequestTest.java +++ b/components/api/src/test/java/com/hotels/styx/api/FullHttpRequestTest.java @@ -16,10 +16,10 @@ package com.hotels.styx.api; import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import rx.observers.TestSubscriber; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; import java.util.Optional; @@ -30,13 +30,13 @@ import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH; import static com.hotels.styx.api.HttpHeaderNames.COOKIE; import static com.hotels.styx.api.HttpHeaderNames.HOST; -import static com.hotels.styx.api.Url.Builder.url; -import static com.hotels.styx.api.RequestCookie.requestCookie; import static com.hotels.styx.api.HttpMethod.DELETE; import static com.hotels.styx.api.HttpMethod.GET; import static com.hotels.styx.api.HttpMethod.POST; import static com.hotels.styx.api.HttpVersion.HTTP_1_0; import static com.hotels.styx.api.HttpVersion.HTTP_1_1; +import static com.hotels.styx.api.RequestCookie.requestCookie; +import static com.hotels.styx.api.Url.Builder.url; import static com.hotels.styx.support.matchers.IsOptional.isAbsent; import static com.hotels.styx.support.matchers.IsOptional.isValue; import static com.hotels.styx.support.matchers.MapMatcher.isMap; @@ -86,13 +86,9 @@ public void convertsToStreamingHttpRequest() throws Exception { public void convertsToStreamingHttpRequestWithEmptyBody(FullHttpRequest fullRequest) { HttpRequest streaming = fullRequest.toStreamingRequest(); - TestSubscriber subscriber = TestSubscriber.create(0); - subscriber.requestMore(1); - - ((StyxCoreObservable) streaming.body()).delegate().subscribe(subscriber); - - assertThat(subscriber.getOnNextEvents().size(), is(0)); - subscriber.assertCompleted(); + StepVerifier.create(streaming.body()) + .expectComplete() + .verify(); } // We want to ensure that these are all considered equivalent @@ -234,12 +230,13 @@ public void requestBodyCannotBeChangedViaStreamingRequest() { .body("original", UTF_8) .build(); - ByteBuf byteBuf = ((StyxCoreObservable) original.toStreamingRequest().body()) - .delegate() - .toBlocking() - .first(); - - byteBuf.array()[0] = 'A'; + Flux.from(original.toStreamingRequest() + .body() + .map(buffer -> { + buffer.delegate().array()[0] = 'A'; + return buffer; + })) + .subscribe(); assertThat(original.bodyAs(UTF_8), is("original")); } diff --git a/components/api/src/test/java/com/hotels/styx/api/FullHttpResponseTest.java b/components/api/src/test/java/com/hotels/styx/api/FullHttpResponseTest.java index fc33891648..ec351704e6 100644 --- a/components/api/src/test/java/com/hotels/styx/api/FullHttpResponseTest.java +++ b/components/api/src/test/java/com/hotels/styx/api/FullHttpResponseTest.java @@ -15,11 +15,10 @@ */ package com.hotels.styx.api; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import rx.observers.TestSubscriber; +import reactor.core.publisher.Flux; import java.util.Optional; import java.util.Set; @@ -28,8 +27,6 @@ import static com.hotels.styx.api.FullHttpResponse.response; import static com.hotels.styx.api.HttpHeader.header; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH; -import static com.hotels.styx.api.ResponseCookie.responseCookie; -import static com.hotels.styx.api.matchers.HttpHeadersMatcher.isNotCacheable; import static com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY; import static com.hotels.styx.api.HttpResponseStatus.BAD_REQUEST; import static com.hotels.styx.api.HttpResponseStatus.CREATED; @@ -41,6 +38,8 @@ import static com.hotels.styx.api.HttpResponseStatus.SEE_OTHER; import static com.hotels.styx.api.HttpResponseStatus.TEMPORARY_REDIRECT; import static com.hotels.styx.api.HttpVersion.HTTP_1_1; +import static com.hotels.styx.api.ResponseCookie.responseCookie; +import static com.hotels.styx.api.matchers.HttpHeadersMatcher.isNotCacheable; import static com.hotels.styx.support.matchers.IsOptional.isAbsent; import static com.hotels.styx.support.matchers.IsOptional.isValue; import static java.nio.charset.StandardCharsets.UTF_16; @@ -322,16 +321,14 @@ public void overridesContent() { } @Test(dataProvider = "emptyBodyResponses") - public void convertsToStreamingHttpResponseWithEmptyBody(FullHttpResponse response) { + public void convertsToStreamingHttpResponseWithEmptyBody(FullHttpResponse response) throws ExecutionException, InterruptedException { HttpResponse streaming = response.toStreamingResponse(); - TestSubscriber subscriber = TestSubscriber.create(0); - subscriber.requestMore(1); - - ((StyxCoreObservable) streaming.body()).delegate().subscribe(subscriber); + byte[] result = streaming.body().aggregate(1000) + .get() + .content(); - assertThat(subscriber.getOnNextEvents().size(), is(0)); - subscriber.assertCompleted(); + assertThat(result.length, is(0)); } // We want to ensure that these are all considered equivalent @@ -432,29 +429,30 @@ public void responseBodyCannotBeChangedViaStreamingMessage() { .body("original", UTF_8) .build(); - ByteBuf byteBuf = ((StyxCoreObservable) original.toStreamingResponse().body()) - .delegate() - .toBlocking() - .first(); - - byteBuf.array()[0] = 'A'; + Flux.from(original.toStreamingResponse() + .body() + .map(buf -> { + buf.delegate().array()[0] = 'A'; + return buf; + })) + .subscribe(); assertThat(original.bodyAs(UTF_8), is("original")); } - @Test(expectedExceptions = io.netty.util.IllegalReferenceCountException.class) + @Test public void toFullResponseReleasesOriginalRefCountedBuffers() throws ExecutionException, InterruptedException { - ByteBuf content = Unpooled.copiedBuffer("original", UTF_8); + Buffer content = new Buffer(Unpooled.copiedBuffer("original", UTF_8)); HttpResponse original = HttpResponse.response(OK) - .body(StyxObservable.of(content)) + .body(new ByteStream(Flux.just(content))) .build(); original.toFullResponse(100) .asCompletableFuture() .get(); - content.array()[0] = 'A'; + assertThat(content.delegate().refCnt(), is(0)); } @Test diff --git a/components/api/src/test/java/com/hotels/styx/api/HttpRequestTest.java b/components/api/src/test/java/com/hotels/styx/api/HttpRequestTest.java index 18204b3c95..7c1d74002b 100644 --- a/components/api/src/test/java/com/hotels/styx/api/HttpRequestTest.java +++ b/components/api/src/test/java/com/hotels/styx/api/HttpRequestTest.java @@ -16,10 +16,9 @@ package com.hotels.styx.api; import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import reactor.core.publisher.Flux; import java.util.concurrent.ExecutionException; import java.util.stream.Stream; @@ -41,7 +40,6 @@ import static com.hotels.styx.support.matchers.IsOptional.isAbsent; import static com.hotels.styx.support.matchers.IsOptional.isValue; import static com.hotels.styx.support.matchers.MapMatcher.isMap; -import static io.netty.buffer.Unpooled.copiedBuffer; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -77,19 +75,19 @@ public void decodesToFullHttpRequest() throws Exception { assertThat(full.body(), is(bytes("foobar"))); } - @Test(expectedExceptions = io.netty.util.IllegalReferenceCountException.class) + @Test public void toFullRequestReleasesOriginalReferenceCountedBuffers() throws ExecutionException, InterruptedException { - ByteBuf content = Unpooled.copiedBuffer("original", UTF_8); + Buffer content = new Buffer("original", UTF_8); HttpRequest original = HttpRequest.get("/foo") - .body(StyxObservable.of(content)) + .body(new ByteStream(Flux.just(content))) .build(); FullHttpRequest fullRequest = original.toFullRequest(100) .asCompletableFuture() .get(); - content.array()[0] = 'A'; + assertThat(content.delegate().refCnt(), is(0)); assertThat(fullRequest.bodyAs(UTF_8), is("original")); } @@ -108,12 +106,12 @@ public void encodesToStreamingHttpRequestWithEmptyBody(HttpRequest streamingRequ private Object[][] emptyBodyRequests() { return new Object[][]{ {get("/foo/bar").build()}, - {post("/foo/bar", StyxCoreObservable.empty()).build()}, + {post("/foo/bar", new ByteStream(Flux.empty())).build()}, }; } @Test - public void createsARequestWithDefaultValues() throws Exception { + public void createsARequestWithDefaultValues() { HttpRequest request = get("/index").build(); assertThat(request.version(), is(HTTP_1_1)); assertThat(request.url().toString(), is("/index")); @@ -295,7 +293,7 @@ public void canRemoveAHeader() { @Test public void shouldSetsContentLengthForNonStreamingBodyMessage() { - assertThat(put("/home").body(StyxObservable.of(copiedBuffer("Hello", UTF_8))).build().header(CONTENT_LENGTH), isAbsent()); + assertThat(put("/home").body(new ByteStream(Flux.just(new Buffer("Hello", UTF_8)))).build().header(CONTENT_LENGTH), isAbsent()); } @Test @@ -428,16 +426,24 @@ public void removesCookiesInSameBuilder() { assertThat(r1.cookie("x"), isAbsent()); } - private static StyxObservable body(String... contents) { - return StyxObservable.from(Stream.of(contents) - .map(content -> copiedBuffer(content, UTF_8)) - .collect(toList())); + private static ByteStream body(String... contents) { + + return new ByteStream( + Flux.fromIterable( + Stream.of(contents) + .map(content -> new Buffer(content, UTF_8)) + .collect(toList()))); } - private static String bytesToString(StyxObservable body) throws ExecutionException, InterruptedException { - return body.reduce((byteBuf, result) -> result + byteBuf.toString(UTF_8), "") - .asCompletableFuture() - .get(); + private static String bytesToString(ByteStream body) { + try { + return new String(body.aggregate(100000).get().content(), UTF_8); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } private static byte[] bytes(String content) { diff --git a/components/api/src/test/java/com/hotels/styx/api/HttpResponseTest.java b/components/api/src/test/java/com/hotels/styx/api/HttpResponseTest.java index 0aa5efd7ef..f85f355e74 100644 --- a/components/api/src/test/java/com/hotels/styx/api/HttpResponseTest.java +++ b/components/api/src/test/java/com/hotels/styx/api/HttpResponseTest.java @@ -15,20 +15,16 @@ */ package com.hotels.styx.api; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import reactor.core.publisher.Flux; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import static com.hotels.styx.api.HttpHeader.header; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH; -import static com.hotels.styx.api.ResponseCookie.responseCookie; -import static com.hotels.styx.api.matchers.HttpHeadersMatcher.isNotCacheable; import static com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY; import static com.hotels.styx.api.HttpResponseStatus.BAD_REQUEST; import static com.hotels.styx.api.HttpResponseStatus.CREATED; @@ -41,11 +37,12 @@ import static com.hotels.styx.api.HttpResponseStatus.TEMPORARY_REDIRECT; import static com.hotels.styx.api.HttpVersion.HTTP_1_0; import static com.hotels.styx.api.HttpVersion.HTTP_1_1; +import static com.hotels.styx.api.ResponseCookie.responseCookie; +import static com.hotels.styx.api.matchers.HttpHeadersMatcher.isNotCacheable; import static com.hotels.styx.support.matchers.IsOptional.isAbsent; import static com.hotels.styx.support.matchers.IsOptional.isValue; -import static java.nio.charset.StandardCharsets.UTF_16; +import static io.netty.buffer.Unpooled.copiedBuffer; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -54,16 +51,16 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; public class HttpResponseTest { + @Test public void encodesToFullHttpResponse() throws Exception { HttpResponse response = response(CREATED) .version(HTTP_1_0) .header("HeaderName", "HeaderValue") .cookies(responseCookie("CookieName", "CookieValue").build()) - .body(body("foo", "bar")) + .body(new ByteStream(Flux.just("foo", "bar").map(it -> new Buffer(copiedBuffer(it, UTF_8))))) .build(); FullHttpResponse full = response.toFullResponse(0x1000) @@ -92,7 +89,7 @@ public void encodesToFullHttpResponseWithEmptyBody(HttpResponse response) throws private Object[][] emptyBodyResponses() { return new Object[][]{ {response().build()}, - {response().body(StyxCoreObservable.empty()).build()}, + {response().body(new ByteStream(Flux.empty())).build()}, }; } @@ -169,16 +166,22 @@ public void canRemoveAHeader() { } @Test - public void canRemoveResponseBody() { + public void canRemoveResponseBody() throws ExecutionException, InterruptedException { + Buffer originalContent = new Buffer("I'm going to get removed.", UTF_8); + HttpResponse response = response(NO_CONTENT) - .body(body("shouldn't be here")) + .body(new ByteStream(Flux.just(originalContent))) .build(); - HttpResponse shouldClearBody = response.newBuilder() - .body(null) - .build(); + FullHttpResponse fullResponse = response.newBuilder() + .body(ByteStream::drop) + .build() + .toFullResponse(1000) + .asCompletableFuture() + .get(); - assertThat(shouldClearBody.body(), is(nullValue())); + assertThat(fullResponse.body().length, is(0)); + assertThat(originalContent.delegate().refCnt(), is(0)); } @Test @@ -294,28 +297,6 @@ public void rejectsInvalidContentLength() { .build(); } - @Test - public void encodesBodyWithCharset() throws InterruptedException, ExecutionException, TimeoutException { - StyxObservable o = StyxObservable.of("Hello, World!"); - - FullHttpResponse responseUtf8 = response() - .body(o, UTF_8) - .build() - .toFullResponse(1_000_000) - .asCompletableFuture() - .get(1, SECONDS); - - FullHttpResponse responseUtf16 = response() - .body(o, UTF_16) - .build() - .toFullResponse(1_000_000) - .asCompletableFuture() - .get(1, SECONDS); - - assertThat(responseUtf8.body(), is("Hello, World!".getBytes(UTF_8))); - assertThat(responseUtf16.body(), is("Hello, World!".getBytes(UTF_16))); - } - @Test public void addsCookies() { HttpResponse response = response() @@ -380,17 +361,15 @@ private static HttpResponse.Builder response(HttpResponseStatus status) { return HttpResponse.response(status); } - private static StyxObservable body(String... contents) { - return StyxObservable.from(Stream.of(contents) - .map(content -> Unpooled.copiedBuffer(content, UTF_8)) - .collect(toList())); + private static ByteStream body(String... contents) { + return new ByteStream(Flux.fromIterable( + Stream.of(contents) + .map(content -> new Buffer(copiedBuffer(content, UTF_8))) + .collect(toList()))); } - private static String bytesToString(StyxObservable body) throws Exception { - return body.reduce((buf, result) -> result + buf.toString(UTF_8), "") - .asCompletableFuture() - .get(); - + private static String bytesToString(ByteStream body) throws Exception { + return new String(body.aggregate(100000).get().content(), UTF_8); } private static byte[] bytes(String s) { diff --git a/components/api/src/test/java/com/hotels/styx/api/ServerCookieEncoderTest.java b/components/api/src/test/java/com/hotels/styx/api/ServerCookieEncoderTest.java index 65349a15a8..ef6c350a5c 100644 --- a/components/api/src/test/java/com/hotels/styx/api/ServerCookieEncoderTest.java +++ b/components/api/src/test/java/com/hotels/styx/api/ServerCookieEncoderTest.java @@ -15,7 +15,6 @@ */ package com.hotels.styx.api; -import com.hotels.styx.api.ServerCookieEncoder; import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.Cookie; import org.testng.annotations.Test; diff --git a/components/client/pom.xml b/components/client/pom.xml index 589ddb6e82..707bad034d 100644 --- a/components/client/pom.xml +++ b/components/client/pom.xml @@ -77,6 +77,12 @@ org.mockito mockito-all
    + + + io.reactivex + rxjava-reactive-streams + + diff --git a/components/client/src/main/java/com/hotels/styx/client/Transport.java b/components/client/src/main/java/com/hotels/styx/client/Transport.java index 97c12c34b7..d928d8e514 100644 --- a/components/client/src/main/java/com/hotels/styx/client/Transport.java +++ b/components/client/src/main/java/com/hotels/styx/client/Transport.java @@ -100,7 +100,8 @@ private Observable connection(HttpRequest request, Optional { - request.releaseContentBuffers(); + // Aggregates an empty body: + request.body().drop().aggregate(1); return Observable.error(new NoAvailableHostsException(appId)); }); } diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java index f12fcb5903..80933b1807 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java @@ -16,12 +16,13 @@ package com.hotels.styx.client.netty.connectionpool; import com.google.common.annotations.VisibleForTesting; +import com.hotels.styx.api.Buffers; +import com.hotels.styx.api.HttpMethod; import com.hotels.styx.api.HttpRequest; import com.hotels.styx.api.HttpResponse; -import com.hotels.styx.api.extension.Origin; -import com.hotels.styx.api.exceptions.TransportLostException; -import com.hotels.styx.api.HttpMethod; import com.hotels.styx.api.HttpVersion; +import com.hotels.styx.api.exceptions.TransportLostException; +import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.Operation; import com.hotels.styx.client.OriginStatsFactory; import com.hotels.styx.common.logging.HttpRequestMessageLogger; @@ -46,13 +47,13 @@ import static com.google.common.base.Objects.toStringHelper; import static com.hotels.styx.api.HttpHeaderNames.HOST; -import static com.hotels.styx.api.StyxInternalObservables.toRxObservable; import static com.hotels.styx.api.extension.service.BackendService.DEFAULT_RESPONSE_TIMEOUT_MILLIS; import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.slf4j.LoggerFactory.getLogger; +import static rx.RxReactiveStreams.toObservable; /** * An operation that writes an HTTP request to an origin. @@ -251,7 +252,8 @@ public void write() { private ChannelFutureListener subscribeToResponseBody() { return future -> { if (future.isSuccess()) { - toRxObservable(request.body()).subscribe(requestBodyChunkSubscriber); + Observable bufferObservable = toObservable(request.body()).map(Buffers::toByteBuf); + bufferObservable.subscribe(requestBodyChunkSubscriber); } else { LOGGER.error(format("error writing body to origin=%s request=%s", nettyConnection.getOrigin(), request), future.cause()); responseFromOriginObserver.onError(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin())); diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java index d16f14efb9..b580a97b91 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java @@ -16,11 +16,13 @@ package com.hotels.styx.client.netty.connectionpool; import com.google.common.annotations.VisibleForTesting; +import com.hotels.styx.api.Buffers; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.HttpRequest; import com.hotels.styx.api.HttpResponse; -import com.hotels.styx.api.extension.Origin; import com.hotels.styx.api.exceptions.ResponseTimeoutException; import com.hotels.styx.api.exceptions.TransportLostException; +import com.hotels.styx.api.extension.Origin; import com.hotels.styx.client.BadHttpResponseException; import com.hotels.styx.client.StyxClientException; import io.netty.buffer.ByteBuf; @@ -41,13 +43,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import static com.hotels.styx.api.HttpResponse.response; -import static com.hotels.styx.api.StyxInternalObservables.fromRxObservable; import static com.hotels.styx.api.HttpResponseStatus.statusWithCode; import static io.netty.util.ReferenceCountUtil.retain; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.StreamSupport.stream; import static org.slf4j.LoggerFactory.getLogger; +import static rx.RxReactiveStreams.toPublisher; /** * A netty channel handler that reads from a channel and pass the message to a {@link Subscriber}. @@ -223,7 +225,7 @@ static HttpResponse.Builder toStyxResponse(io.netty.handler.codec.http.HttpRespo private static HttpResponse toStyxResponse(io.netty.handler.codec.http.HttpResponse nettyResponse, Observable contentObservable, Origin origin) { try { return toStyxResponse(nettyResponse) - .body(fromRxObservable(contentObservable)) + .body(new ByteStream(toPublisher(contentObservable.map(Buffers::fromByteBuf)))) .build(); } catch (IllegalArgumentException e) { throw new BadHttpResponseException(origin, e); diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/TransportTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/TransportTest.java index 218af9ebc9..14deb7bf4d 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/TransportTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/TransportTest.java @@ -16,13 +16,13 @@ package com.hotels.styx.client; import com.google.common.collect.ImmutableList; +import com.hotels.styx.api.Buffer; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.HttpRequest; import com.hotels.styx.api.HttpResponse; import com.hotels.styx.api.Id; -import com.hotels.styx.api.StyxObservable; import com.hotels.styx.api.exceptions.NoAvailableHostsException; import com.hotels.styx.client.connectionpool.ConnectionPool; -import io.netty.buffer.ByteBuf; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import rx.Observable; @@ -32,10 +32,10 @@ import java.util.Optional; +import static com.hotels.styx.api.Buffers.toByteBuf; import static com.hotels.styx.api.HttpRequest.get; import static com.hotels.styx.api.HttpResponseStatus.OK; import static com.hotels.styx.api.Id.id; -import static io.netty.buffer.Unpooled.copiedBuffer; import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -45,6 +45,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static rx.Observable.just; +import static rx.RxReactiveStreams.toPublisher; public class TransportTest { private static final String X_STYX_ORIGIN_ID = "X-Styx-Origin-Id"; @@ -174,24 +175,22 @@ public void closesIfObservableUnsubscribedAfterHeaders() { @Test public void releasesContentStreamBuffersWhenPoolIsNotProvided() { - ByteBuf chunk1 = copiedBuffer("x", UTF_8); - ByteBuf chunk2 = copiedBuffer("y", UTF_8); - ByteBuf chunk3 = copiedBuffer("z", UTF_8); - - StyxObservable contentStream = StyxObservable.from(ImmutableList.of(chunk1, chunk2, chunk3)); + Buffer chunk1 = new Buffer("x", UTF_8); + Buffer chunk2 = new Buffer("y", UTF_8); + Buffer chunk3 = new Buffer("z", UTF_8); HttpRequest aRequest = request .newBuilder() - .body(contentStream) + .body(new ByteStream(toPublisher(Observable.from(ImmutableList.of(chunk1, chunk2, chunk3))))) .build(); transport.send(aRequest, Optional.empty(), APP_ID) .response() .subscribe(subscriber); - assertThat(chunk1.refCnt(), is(0)); - assertThat(chunk2.refCnt(), is(0)); - assertThat(chunk3.refCnt(), is(0)); + assertThat(toByteBuf(chunk1).refCnt(), is(0)); + assertThat(toByteBuf(chunk2).refCnt(), is(0)); + assertThat(toByteBuf(chunk3).refCnt(), is(0)); } @Test diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java index 8249524dbc..5ddf698b99 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagatorTest.java @@ -16,6 +16,7 @@ package com.hotels.styx.client.netty.connectionpool; import com.google.common.base.Throwables; +import com.hotels.styx.api.Buffers; import com.hotels.styx.api.HttpResponse; import com.hotels.styx.api.exceptions.ResponseTimeoutException; import com.hotels.styx.api.exceptions.TransportLostException; @@ -45,7 +46,6 @@ import static com.google.common.base.Charsets.UTF_8; import static com.hotels.styx.api.Id.GENERIC_APP; import static com.hotels.styx.api.ResponseCookie.responseCookie; -import static com.hotels.styx.api.StyxInternalObservables.toRxObservable; import static com.hotels.styx.api.extension.Origin.newOriginBuilder; import static com.hotels.styx.client.netty.connectionpool.NettyToStyxResponsePropagator.toStyxResponse; import static com.hotels.styx.support.matchers.IsOptional.isValue; @@ -63,6 +63,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static rx.RxReactiveStreams.toObservable; public class NettyToStyxResponsePropagatorTest { private ByteBuf firstContentChunk = copiedBuffer("first chunk", UTF_8); @@ -327,7 +328,9 @@ private static HttpContent newHttpContent(String content) { private TestSubscriber subscribeToContent(HttpResponse response) { TestSubscriber contentSubscriber = new TestSubscriber<>(); - toRxObservable(response.body()).subscribe(contentSubscriber); + toObservable(response.body()) + .map(Buffers::toByteBuf) + .subscribe(contentSubscriber); return contentSubscriber; } diff --git a/components/common/src/main/java/com/hotels/styx/api/Buffers.java b/components/common/src/main/java/com/hotels/styx/api/Buffers.java new file mode 100644 index 0000000000..0bff6b9434 --- /dev/null +++ b/components/common/src/main/java/com/hotels/styx/api/Buffers.java @@ -0,0 +1,43 @@ +/* + Copyright (C) 2013-2018 Expedia Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.hotels.styx.api; + +import io.netty.buffer.ByteBuf; + +/** + * Conversions between Styx Buffer and Netty ByteBuf objects. + */ +public final class Buffers { + /** + * Builds a Styx Buffer from Netty ByteBuf. + * + * @param byteBuf + * @return + */ + public static Buffer fromByteBuf(ByteBuf byteBuf) { + return new Buffer(byteBuf); + } + + /** + * Returns a Netty ByteBuf corresponding to a Styx Buffer. + * + * @param buffer + * @return + */ + public static ByteBuf toByteBuf(Buffer buffer) { + return buffer.delegate(); + } +} diff --git a/components/proxy/src/main/java/com/hotels/styx/routing/handlers/StaticResponseHandler.java b/components/proxy/src/main/java/com/hotels/styx/routing/handlers/StaticResponseHandler.java index ef9194a9c5..8f66218ec5 100644 --- a/components/proxy/src/main/java/com/hotels/styx/routing/handlers/StaticResponseHandler.java +++ b/components/proxy/src/main/java/com/hotels/styx/routing/handlers/StaticResponseHandler.java @@ -16,6 +16,8 @@ package com.hotels.styx.routing.handlers; import com.fasterxml.jackson.annotation.JsonProperty; +import com.hotels.styx.api.Buffer; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.HttpHandler; import com.hotels.styx.api.HttpInterceptor; import com.hotels.styx.api.HttpRequest; @@ -25,9 +27,11 @@ import com.hotels.styx.routing.config.HttpHandlerFactory; import com.hotels.styx.routing.config.RouteHandlerDefinition; import com.hotels.styx.routing.config.RouteHandlerFactory; +import reactor.core.publisher.Flux; import java.util.List; +import static com.hotels.styx.api.HttpResponse.response; import static com.hotels.styx.api.HttpResponseStatus.statusWithCode; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; @@ -46,7 +50,7 @@ private StaticResponseHandler(int status, String text) { @Override public StyxObservable handle(HttpRequest request, HttpInterceptor.Context context) { - return StyxObservable.of(HttpResponse.response(statusWithCode(status)).body(StyxObservable.of(text), UTF_8).build()); + return StyxObservable.of(response(statusWithCode(status)).body(new ByteStream(Flux.just(new Buffer(text, UTF_8)))).build()); } private static class StaticResponseConfig { diff --git a/components/proxy/src/main/java/com/hotels/styx/serviceproviders/ServiceProvision.java b/components/proxy/src/main/java/com/hotels/styx/serviceproviders/ServiceProvision.java index ae3cbcd411..65178d68d8 100644 --- a/components/proxy/src/main/java/com/hotels/styx/serviceproviders/ServiceProvision.java +++ b/components/proxy/src/main/java/com/hotels/styx/serviceproviders/ServiceProvision.java @@ -19,15 +19,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import com.hotels.styx.api.Environment; +import com.hotels.styx.api.configuration.Configuration; +import com.hotels.styx.api.configuration.ConfigurationException; +import com.hotels.styx.api.configuration.ServiceFactory; import com.hotels.styx.api.extension.ActiveOrigins; import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancer; import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancerFactory; import com.hotels.styx.api.extension.retrypolicy.spi.RetryPolicy; import com.hotels.styx.api.extension.retrypolicy.spi.RetryPolicyFactory; -import com.hotels.styx.api.configuration.Configuration; -import com.hotels.styx.api.configuration.ConfigurationException; -import com.hotels.styx.api.configuration.ServiceFactory; -import com.hotels.styx.client.StyxBackendServiceClient; import com.hotels.styx.common.Pair; import com.hotels.styx.infrastructure.configuration.yaml.JsonNodeConfig; import com.hotels.styx.spi.config.ServiceFactoryConfig; @@ -57,7 +56,7 @@ private ServiceProvision() { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvision.class); /** - * Create a {@link StyxBackendServiceClient} related factory configured with a particular key, + * Create a {@link com.hotels.styx.client.StyxBackendServiceClient} related factory configured with a particular key, * then uses the factory's create method to create its product. * * @param service type diff --git a/components/proxy/src/test/java/com/hotels/styx/admin/handlers/JsonHandlerTest.java b/components/proxy/src/test/java/com/hotels/styx/admin/handlers/JsonHandlerTest.java index a0cf2e5ee5..312f1e9e65 100644 --- a/components/proxy/src/test/java/com/hotels/styx/admin/handlers/JsonHandlerTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/admin/handlers/JsonHandlerTest.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.hotels.styx.api.Clock; +import com.hotels.styx.api.HttpRequest; import com.hotels.styx.server.HttpInterceptorContext; import org.testng.annotations.Test; @@ -26,12 +27,11 @@ import static com.hotels.styx.api.HttpRequest.get; import static com.hotels.styx.common.StyxFutures.await; -import static com.hotels.styx.support.api.HttpMessageBodies.bodyAsString; import static java.lang.System.currentTimeMillis; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import com.hotels.styx.api.HttpRequest; public class JsonHandlerTest { long time = currentTimeMillis(); @@ -102,11 +102,15 @@ private Supplier sequentialSupplier(T... elements) { } private String response(JsonHandler handler) { - return bodyAsString(await(handler.handle(get("/").build(), HttpInterceptorContext.create()).asCompletableFuture())); + return responseFor(handler, get("/").build()); } private String responseFor(JsonHandler handler, HttpRequest request) { - return bodyAsString(await(handler.handle(request, HttpInterceptorContext.create()).asCompletableFuture())); + return await( + handler.handle(request, HttpInterceptorContext.create()) + .flatMap(response -> response.toFullResponse(1000000)) + .map(response -> response.bodyAs(UTF_8)) + .asCompletableFuture()); } private static class Convertible { diff --git a/components/proxy/src/test/java/com/hotels/styx/proxy/interceptors/HttpMessageLoggingInterceptorTest.java b/components/proxy/src/test/java/com/hotels/styx/proxy/interceptors/HttpMessageLoggingInterceptorTest.java index faf64406d4..4dd2548ff4 100644 --- a/components/proxy/src/test/java/com/hotels/styx/proxy/interceptors/HttpMessageLoggingInterceptorTest.java +++ b/components/proxy/src/test/java/com/hotels/styx/proxy/interceptors/HttpMessageLoggingInterceptorTest.java @@ -20,7 +20,6 @@ import com.hotels.styx.api.HttpResponse; import com.hotels.styx.api.StyxObservable; import com.hotels.styx.server.HttpInterceptorContext; -import com.hotels.styx.support.api.HttpMessageBodies; import com.hotels.styx.support.matchers.LoggingTestSupport; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -128,6 +127,6 @@ public HttpInterceptor.Context context() { } private static void consume(StyxObservable resp) { - await(resp.map(HttpMessageBodies::bodyAsString).asCompletableFuture()); + await(resp.flatMap(it -> it.toFullResponse(1000000)).asCompletableFuture()); } } \ No newline at end of file diff --git a/components/server/src/main/java/com/hotels/styx/server/handlers/ClassPathResourceHandler.java b/components/server/src/main/java/com/hotels/styx/server/handlers/ClassPathResourceHandler.java index f2e0eb93e8..2dc1f8cced 100644 --- a/components/server/src/main/java/com/hotels/styx/server/handlers/ClassPathResourceHandler.java +++ b/components/server/src/main/java/com/hotels/styx/server/handlers/ClassPathResourceHandler.java @@ -16,23 +16,25 @@ package com.hotels.styx.server.handlers; import com.google.common.io.ByteStreams; +import com.hotels.styx.api.Buffer; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.FullHttpResponse; import com.hotels.styx.api.HttpRequest; import com.hotels.styx.api.HttpResponse; -import com.hotels.styx.api.StyxObservable; -import com.hotels.styx.common.http.handler.BaseHttpHandler; import com.hotels.styx.api.HttpResponseStatus; +import com.hotels.styx.common.http.handler.BaseHttpHandler; +import reactor.core.publisher.Flux; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_TYPE; -import static com.hotels.styx.server.handlers.MediaTypes.mediaTypeOf; import static com.hotels.styx.api.HttpResponseStatus.FORBIDDEN; import static com.hotels.styx.api.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static com.hotels.styx.api.HttpResponseStatus.NOT_FOUND; import static com.hotels.styx.api.HttpResponseStatus.OK; +import static com.hotels.styx.server.handlers.MediaTypes.mediaTypeOf; import static java.nio.charset.StandardCharsets.UTF_8; /** @@ -78,7 +80,7 @@ private static byte[] resourceBody(String path) throws IOException { private static HttpResponse error(HttpResponseStatus status) { return new HttpResponse.Builder(status) - .body(StyxObservable.of(status.description()), UTF_8) + .body(new ByteStream(Flux.just(new Buffer(status.description(), UTF_8)))) .build(); } diff --git a/components/server/src/main/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoder.java b/components/server/src/main/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoder.java index 01f5f7bf7d..2349cb3b07 100644 --- a/components/server/src/main/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoder.java +++ b/components/server/src/main/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoder.java @@ -16,6 +16,8 @@ package com.hotels.styx.server.netty.codec; import com.google.common.annotations.VisibleForTesting; +import com.hotels.styx.api.Buffers; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.HttpVersion; import com.hotels.styx.api.Url; import com.hotels.styx.server.BadRequestException; @@ -47,12 +49,12 @@ import static com.google.common.collect.Iterables.size; import static com.hotels.styx.api.HttpHeaderNames.EXPECT; import static com.hotels.styx.api.HttpHeaderNames.HOST; -import static com.hotels.styx.api.StyxInternalObservables.fromRxObservable; import static com.hotels.styx.api.Url.Builder.url; import static com.hotels.styx.server.UniqueIdSuppliers.UUID_VERSION_ONE_SUPPLIER; import static com.hotels.styx.server.netty.codec.UnwiseCharsEncoder.IGNORE; import static java.util.Objects.requireNonNull; import static java.util.stream.StreamSupport.stream; +import static rx.RxReactiveStreams.toPublisher; /** * This {@link MessageToMessageDecoder} is responsible for decode {@link io.netty.handler.codec.http.HttpRequest} @@ -169,7 +171,7 @@ com.hotels.styx.api.HttpRequest.Builder makeAStyxRequestFrom(HttpRequest request .url(url) .version(toStyxVersion(request.protocolVersion())) .id(uniqueIdSupplier.get()) - .body(fromRxObservable(content)); + .body(new ByteStream(toPublisher(content.map(Buffers::fromByteBuf)))); stream(request.headers().spliterator(), false) .forEach(entry -> requestBuilder.addHeader(entry.getKey(), entry.getValue())); diff --git a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java index 2c6a5d6471..bb289a3057 100644 --- a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java +++ b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java @@ -17,6 +17,8 @@ import com.google.common.annotations.VisibleForTesting; +import com.hotels.styx.api.Buffer; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.ContentOverflowException; import com.hotels.styx.api.HttpHandler; import com.hotels.styx.api.HttpInterceptor; @@ -24,7 +26,6 @@ import com.hotels.styx.api.HttpResponse; import com.hotels.styx.api.HttpResponseStatus; import com.hotels.styx.api.MetricRegistry; -import com.hotels.styx.api.StyxObservable; import com.hotels.styx.api.exceptions.NoAvailableHostsException; import com.hotels.styx.api.exceptions.OriginUnreachableException; import com.hotels.styx.api.exceptions.ResponseTimeoutException; @@ -50,6 +51,7 @@ import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.TooLongFrameException; import org.slf4j.Logger; +import reactor.core.publisher.Flux; import rx.Observable; import rx.Subscriber; import rx.Subscription; @@ -495,7 +497,7 @@ private HttpResponse exceptionToResponse(Throwable exception, HttpRequest reques return responseEnhancer.enhance(HttpResponse.response(status), request) .header(CONTENT_LENGTH, message.getBytes(UTF_8).length) - .body(StyxObservable.of(message), UTF_8) + .body(new ByteStream(Flux.just(new Buffer(message, UTF_8)))) .build(); } diff --git a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java index b590e1808e..07e1f5ee0b 100644 --- a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java +++ b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpResponseWriter.java @@ -15,6 +15,7 @@ */ package com.hotels.styx.server.netty.connectors; +import com.hotels.styx.api.Buffers; import com.hotels.styx.api.HttpResponse; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; @@ -31,10 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import static com.hotels.styx.api.StyxInternalObservables.toRxObservable; import static io.netty.handler.codec.http.HttpHeaders.setTransferEncodingChunked; import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; import static java.util.Objects.requireNonNull; +import static rx.RxReactiveStreams.toObservable; /** * Netty HTTP response writer. @@ -79,7 +80,7 @@ public CompletableFuture write(HttpResponse response) { } }); - Subscription subscriber = toRxObservable(response.body()).subscribe(new Subscriber() { + Subscription subscriber = toObservable(response.body()).map(Buffers::toByteBuf).subscribe(new Subscriber() { @Override public void onStart() { request(1); @@ -157,7 +158,7 @@ private void onWriteEmptyLastChunkOutcome(ChannelFuture writeOp) { return future; } catch (Throwable cause) { LOGGER.warn("Failed to convert response headers. response={}, Cause={}", new Object[]{response, cause}); - response.releaseContentBuffers(); + toObservable(response.body()).forEach(it -> Buffers.toByteBuf(it).release()); future.completeExceptionally(cause); return future; } diff --git a/components/server/src/test/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoderTest.java b/components/server/src/test/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoderTest.java index f79013d3cc..4105c33f98 100644 --- a/components/server/src/test/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoderTest.java +++ b/components/server/src/test/java/com/hotels/styx/server/netty/codec/NettyToStyxRequestDecoderTest.java @@ -16,9 +16,10 @@ package com.hotels.styx.server.netty.codec; import com.google.common.base.Strings; +import com.hotels.styx.api.Buffer; import com.hotels.styx.api.HttpHeader; import com.hotels.styx.api.HttpMethod; -import com.hotels.styx.api.StyxObservable; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.server.BadRequestException; import com.hotels.styx.server.UniqueIdSupplier; import io.netty.buffer.ByteBuf; @@ -50,7 +51,6 @@ import static com.google.common.base.Charsets.UTF_8; import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Sets.newHashSet; -import static com.hotels.styx.api.StyxInternalObservables.toRxObservable; import static com.hotels.styx.api.RequestCookie.requestCookie; import static com.hotels.styx.server.UniqueIdSuppliers.fixedUniqueIdSupplier; import static com.hotels.styx.support.netty.HttpMessageSupport.httpMessageToBytes; @@ -71,6 +71,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static rx.RxReactiveStreams.toObservable; public class NettyToStyxRequestDecoderTest { private final UniqueIdSupplier uniqueIdSupplier = fixedUniqueIdSupplier("1"); @@ -345,24 +346,24 @@ private static HttpContent newHttpContent(String content) { } - private TestSubscriber subscribeTo(StyxObservable contentObservable) { - TestSubscriber subscriber = new TestSubscriber<>(); - toRxObservable(contentObservable).subscribe(subscriber); + private TestSubscriber subscribeTo(ByteStream contentStream) { + TestSubscriber subscriber = new TestSubscriber<>(); + toObservable(contentStream).subscribe(subscriber); return subscriber; } - private String subscribeAndRead(StyxObservable contentObservable) throws InterruptedException { + private String subscribeAndRead(ByteStream content) throws InterruptedException { CountDownLatch bodyCompletedLatch = new CountDownLatch(1); - StringBuilder contentBuilder = subscribeToContent(contentObservable, bodyCompletedLatch); + StringBuilder contentBuilder = subscribeToContent(content, bodyCompletedLatch); bodyCompletedLatch.await(); return contentBuilder.toString(); } - private static StringBuilder subscribeToContent(StyxObservable content, CountDownLatch onCompleteLatch) { + private static StringBuilder subscribeToContent(ByteStream contentStream, CountDownLatch onCompleteLatch) { StringBuilder builder = new StringBuilder(); - toRxObservable(content).subscribe(new Subscriber() { + toObservable(contentStream).subscribe(new Subscriber() { @Override public void onCompleted() { // no-op @@ -375,8 +376,8 @@ public void onError(Throwable e) { } @Override - public void onNext(ByteBuf byteBuf) { - builder.append(byteBuf.toString(UTF_8)); + public void onNext(Buffer buffer) { + builder.append(new String(buffer.content(), UTF_8)); } }); return builder; diff --git a/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpResponseWriterTest.java b/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpResponseWriterTest.java index 7678d2e381..5c27a487f7 100644 --- a/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpResponseWriterTest.java +++ b/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpResponseWriterTest.java @@ -18,10 +18,11 @@ import ch.qos.logback.classic.Level; import com.google.common.collect.ImmutableList; +import com.hotels.styx.api.Buffer; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.HttpResponse; import com.hotels.styx.api.exceptions.TransportLostException; import com.hotels.styx.support.matchers.LoggingTestSupport; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -42,26 +43,26 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.Charsets.UTF_8; +import static com.hotels.styx.api.Buffers.toByteBuf; import static com.hotels.styx.api.HttpResponse.response; import static com.hotels.styx.api.HttpResponseStatus.OK; import static com.hotels.styx.api.ResponseCookie.responseCookie; -import static com.hotels.styx.api.StyxInternalObservables.fromRxObservable; import static com.hotels.styx.api.extension.Origin.newOriginBuilder; import static com.hotels.styx.support.matchers.LoggingEventMatcher.loggingEvent; -import static io.netty.buffer.Unpooled.copiedBuffer; import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT; import static java.net.InetAddress.getLoopbackAddress; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; +import static rx.RxReactiveStreams.toPublisher; public class HttpResponseWriterTest { private LoggingTestSupport LOGGER; - private PublishSubject contentObservable; + + private PublishSubject contentObservable; private Queue channelArgs; private AtomicBoolean channelRead; @@ -88,7 +89,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th CompletableFuture future = writer.write(response); assertThat(future.isDone(), is(false)); - contentObservable.onNext(copiedBuffer("aaa", UTF_8)); + contentObservable.onNext(new Buffer("aaa", UTF_8)); assertThat(future.isDone(), is(false)); contentObservable.onCompleted(); @@ -99,7 +100,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th } ); - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable)).build()); + ch.writeInbound(response(OK).body(new ByteStream(toPublisher(contentObservable))).build()); assertThat(channelRead.get(), is(true)); } @@ -115,7 +116,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th CompletableFuture future = writer.write(response); assertThat(future.isDone(), is(false)); - contentObservable.onNext(copiedBuffer("aaa", UTF_8)); + contentObservable.onNext(new Buffer("aaa", UTF_8)); assertThat(future.isDone(), is(false)); contentObservable.onCompleted(); @@ -131,7 +132,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th } ); - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable)).build()); + ch.writeInbound(response(OK).body(new ByteStream(toPublisher(contentObservable))).build()); assertThat(channelRead.get(), is(true)); } @@ -171,7 +172,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th writeAck(channelArgs); // For response headers assertThat(future.isDone(), is(false)); - contentObservable.onNext(copiedBuffer("aaa", UTF_8)); + contentObservable.onNext(new Buffer("aaa", UTF_8)); writeAck(channelArgs); // For content chunk assertThat(future.isDone(), is(false)); @@ -186,7 +187,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th } ); - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable)).build()); + ch.writeInbound(response(OK).body(new ByteStream(toPublisher(contentObservable))).build()); assertThat(channelRead.get(), is(true)); } @@ -208,7 +209,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th } ); - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable)).build()); + ch.writeInbound(response(OK).body(new ByteStream(toPublisher(contentObservable))).build()); } @@ -225,7 +226,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th writeAck(channelArgs); assertThat(future.isDone(), is(false)); - contentObservable.onNext(copiedBuffer("aaa", UTF_8)); + contentObservable.onNext(new Buffer("aaa", UTF_8)); assertThat(future.isDone(), is(false)); contentObservable.onCompleted(); @@ -239,7 +240,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th } ); - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable)).build()); + ch.writeInbound(response(OK).body(new ByteStream(toPublisher(contentObservable))).build()); } @Test @@ -269,7 +270,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th } ); - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable)).build()); + ch.writeInbound(response(OK).body(new ByteStream(toPublisher(contentObservable))).build()); assertThat(channelRead.get(), is(true)); List writeEvents = writeEventsCollector.writeEvents(); @@ -305,7 +306,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th } ); - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable.doOnUnsubscribe(() -> unsubscribed.set(true)))).build()); + ch.writeInbound(response(OK).body(new ByteStream(toPublisher(contentObservable.doOnUnsubscribe(() -> unsubscribed.set(true))))).build()); assertThat(channelRead.get(), is(true)); } @@ -313,8 +314,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th public void releasesContentChunksWhenFailsToConvertToNettyHeaders() throws Exception { CaptureHttpResponseWriteEventsHandler writeEventsCollector = new CaptureHttpResponseWriteEventsHandler(); - ByteBuf chunk1 = copiedBuffer("aaa", UTF_8); - ByteBuf chunk2 = copiedBuffer("aaa", UTF_8); + Buffer chunk1 = new Buffer("aaa", UTF_8); + Buffer chunk2 = new Buffer("aaa", UTF_8); AtomicBoolean unsubscribed = new AtomicBoolean(false); EmbeddedChannel ch = new EmbeddedChannel( @@ -334,8 +335,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th contentObservable.onCompleted(); assertThat(future.isDone(), is(true)); - assertThat(chunk1.refCnt(), is(0)); - assertThat(chunk2.refCnt(), is(0)); + assertThat(toByteBuf(chunk1).refCnt(), is(0)); + assertThat(toByteBuf(chunk2).refCnt(), is(0)); channelRead.set(true); } @@ -343,49 +344,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th ); HttpResponse.Builder response = response(OK).cookies(responseCookie(",,,,", ",,,,").build()); - ch.writeInbound(response.body(fromRxObservable(contentObservable.doOnUnsubscribe(() -> unsubscribed.set(true)))).build()); - assertThat(channelRead.get(), is(true)); - } - - @Test - public void requestsMoreContentAfterSuccessfulWrite() throws Exception { - AtomicLong requested = new AtomicLong(0L); - - EmbeddedChannel ch = new EmbeddedChannel( - new CaptureChannelArgumentsHandler(channelArgs), - new LoggingHandler(), - new SimpleChannelInboundHandler() { - @Override - protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) throws Exception { - HttpResponseWriter writer = new HttpResponseWriter(ctx); - CompletableFuture future = writer.write(response); - assertThat(future.isDone(), is(false)); - writeAck(channelArgs); // For response headers - - contentObservable.onNext(copiedBuffer("aaa", UTF_8)); - assertThat(future.isDone(), is(false)); - assertThat(requested.get(), is(1L)); - - contentObservable.onNext(copiedBuffer("bbb", UTF_8)); - assertThat(future.isDone(), is(false)); - assertThat(requested.get(), is(1L)); - - writeAck(channelArgs); // For content chunk: aaa - assertThat(requested.get(), is(2L)); - - writeAck(channelArgs); // For content chunk: bbb - assertThat(requested.get(), is(3L)); - - contentObservable.onCompleted(); - writeAck(channelArgs); // For EMPTY_LAST_CHUNK - assertThat(future.isDone(), is(true)); - - channelRead.set(true); - } - } - ); - - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable.doOnRequest(requested::addAndGet))).build()); + ch.writeInbound(response.body(new ByteStream(toPublisher(contentObservable.doOnUnsubscribe(() -> unsubscribed.set(true))))).build()); assertThat(channelRead.get(), is(true)); } @@ -399,10 +358,10 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th CompletableFuture future = writer.write(response); assertThat(future.isDone(), is(false)); - contentObservable.onNext(copiedBuffer("aaa", UTF_8)); + contentObservable.onNext(new Buffer("aaa", UTF_8)); assertThat(future.isDone(), is(false)); - contentObservable.onNext(copiedBuffer("bbbb", UTF_8)); + contentObservable.onNext(new Buffer("bbbb", UTF_8)); assertThat(future.isDone(), is(false)); contentObservable.onError(new TransportLostException( @@ -415,7 +374,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpResponse response) th } ); - ch.writeInbound(response(OK).body(fromRxObservable(contentObservable)).build()); + ch.writeInbound(response(OK).body(new ByteStream(toPublisher(contentObservable))).build()); assertThat(LOGGER.lastMessage(), is( loggingEvent( diff --git a/pom.xml b/pom.xml index c4cb771043..e3b667b278 100755 --- a/pom.xml +++ b/pom.xml @@ -102,6 +102,14 @@ 4.1.15.Final 2.0.6.Final 1.1.6 + 1.0.2 + 3.2.0.RELEASE + + + 1.2.51 + true + + 3.1.5 1.1.2 @@ -155,7 +163,6 @@ ${project.build.directory}/jacoco-it.exec java - 91 80 90 @@ -173,9 +180,7 @@ false - - 1.2.51 - true + 1.1.1 @@ -238,10 +243,35 @@ ${rxjava.version} + + io.reactivex + rxjava-reactive-streams + ${rxjava-reactive-streams.version} + + + + org.reactivestreams + reactive-streams + ${reactive-streams.version} + + + + io.projectreactor + reactor-core + ${reactor.version} + + + + io.projectreactor + reactor-test + ${reactor.version} + test + + com.fasterxml.uuid java-uuid-generator - 3.1.5 + ${java-uuid-generator.version} diff --git a/support/api-testsupport/src/main/java/com/hotels/styx/support/api/HttpMessageBodies.java b/support/api-testsupport/src/main/java/com/hotels/styx/support/api/HttpMessageBodies.java deleted file mode 100644 index 3bc84f582a..0000000000 --- a/support/api-testsupport/src/main/java/com/hotels/styx/support/api/HttpMessageBodies.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - Copyright (C) 2013-2018 Expedia Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.hotels.styx.support.api; - -import com.hotels.styx.api.HttpResponse; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static java.nio.charset.StandardCharsets.UTF_8; -import com.hotels.styx.api.HttpRequest; - - -/** - * Provides a support method for dealing with streaming HTTP request bodies. - */ -public final class HttpMessageBodies { - /** - * Return the body of {@code message} as string. Note that this will buffer all the message body in memory. - * - * @param message the message to read the body from - * @return the body of the message as string - */ - public static String bodyAsString(HttpRequest message) { - return await(message.toFullRequest(0x100000) - .asCompletableFuture()) - .bodyAs(UTF_8); - } - - public static String bodyAsString(HttpResponse message) { - return await(message.toFullResponse(0x100000) - .asCompletableFuture()) - .bodyAs(UTF_8); - } - - private static T await(CompletableFuture future) { - try { - return future.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - private HttpMessageBodies() { - } -} diff --git a/support/api-testsupport/src/test/java/com/hotels/styx/support/api/HttpMessageBodiesTest.java b/support/api-testsupport/src/test/java/com/hotels/styx/support/api/HttpMessageBodiesTest.java deleted file mode 100644 index 9436a37a3e..0000000000 --- a/support/api-testsupport/src/test/java/com/hotels/styx/support/api/HttpMessageBodiesTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - Copyright (C) 2013-2018 Expedia Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.hotels.styx.support.api; - -import com.hotels.styx.api.FullHttpRequest; -import com.hotels.styx.api.HttpRequest; -import com.hotels.styx.api.HttpResponse; -import com.hotels.styx.api.StyxObservable; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.testng.annotations.Test; - -import static com.hotels.styx.api.FullHttpResponse.response; -import static com.hotels.styx.api.HttpRequest.post; -import static com.hotels.styx.support.api.HttpMessageBodies.bodyAsString; -import static io.netty.util.CharsetUtil.UTF_8; -import static java.util.Arrays.stream; -import static java.util.stream.Collectors.toList; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -public class HttpMessageBodiesTest { - @Test - public void createsRequestBodyString() { - HttpRequest request = FullHttpRequest.post("/") - .body("Hello, World!", UTF_8) - .build() - .toStreamingRequest(); - - assertThat(bodyAsString(request), is("Hello, World!")); - } - - @Test - public void createsRequestBodyStringFromObservable() { - HttpRequest request = post("/") - .body(byteBufObservable("Hello,", " Wor", "ld!")) - .build(); - - assertThat(bodyAsString(request), is("Hello, World!")); - } - - @Test - public void createsResponseBodyString() { - HttpResponse response = response() - .body("Hello, World!", UTF_8) - .build() - .toStreamingResponse(); - - assertThat(bodyAsString(response), is("Hello, World!")); - } - - @Test - public void createsResponseBodyStringFromObservable() { - HttpResponse response = HttpResponse.response() - .body(byteBufObservable("Hello,", " Wor", "ld!")) - .build(); - - assertThat(bodyAsString(response), is("Hello, World!")); - } - - - private static StyxObservable byteBufObservable(String... strings) { - return StyxObservable.from(stream(strings) - .map(HttpMessageBodiesTest::buf) - .collect(toList())); - } - - private static ByteBuf buf(CharSequence charSequence) { - return Unpooled.copiedBuffer(charSequence, UTF_8); - } -} \ No newline at end of file diff --git a/support/origins-starter-app/src/main/java/com/hotels/styx/support/origins/AppHandler.java b/support/origins-starter-app/src/main/java/com/hotels/styx/support/origins/AppHandler.java index 00337f3b79..0dd61fb6a0 100644 --- a/support/origins-starter-app/src/main/java/com/hotels/styx/support/origins/AppHandler.java +++ b/support/origins-starter-app/src/main/java/com/hotels/styx/support/origins/AppHandler.java @@ -15,14 +15,17 @@ */ package com.hotels.styx.support.origins; +import com.hotels.styx.api.Buffer; +import com.hotels.styx.api.ByteStream; import com.hotels.styx.api.HttpHandler; import com.hotels.styx.api.HttpInterceptor; import com.hotels.styx.api.HttpRequest; import com.hotels.styx.api.HttpResponse; +import com.hotels.styx.api.HttpResponseStatus; import com.hotels.styx.api.StyxObservable; import com.hotels.styx.api.extension.Origin; import com.hotels.styx.common.http.handler.StaticBodyHttpHandler; -import com.hotels.styx.api.HttpResponseStatus; +import reactor.core.publisher.Flux; import static com.google.common.net.MediaType.HTML_UTF_8; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH; @@ -57,11 +60,11 @@ public StyxObservable handle(HttpRequest request, HttpInterceptor. request.queryParam("status").ifPresent(status -> responseBuilder .status(httpResponseStatus(status)) - .body(StyxObservable.of("Returning requested status (" + status + ")"), UTF_8) + .body(new ByteStream(Flux.just(new Buffer("Returning requested status (" + status + ")", UTF_8)))) ); request.queryParam("length").ifPresent(length -> - responseBuilder.body(StyxObservable.of(generateContent(parseInt(length))), UTF_8) + responseBuilder.body(new ByteStream(Flux.just(new Buffer(generateContent(parseInt(length)), UTF_8)))) ); return responseBuilder.build(); diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/client/OriginClosesConnectionSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/client/OriginClosesConnectionSpec.scala index 4e55c135c6..70f8cc32f5 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/client/OriginClosesConnectionSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/client/OriginClosesConnectionSpec.scala @@ -18,7 +18,7 @@ package com.hotels.styx.client import ch.qos.logback.classic.Level import com.google.common.base.Charsets._ import com.hotels.styx.api.FullHttpRequest.get -import com.hotels.styx.api.{HttpResponse, extension} +import com.hotels.styx.api.{Buffer, HttpResponse, extension} import com.hotels.styx.api.extension.ActiveOrigins import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancer import com.hotels.styx.api.HttpResponseStatus.OK @@ -31,7 +31,6 @@ import com.hotels.styx.support.configuration.{BackendService, HttpBackend, Origi import com.hotels.styx.support.matchers.LoggingTestSupport import com.hotels.styx.support.observables.ImplicitRxConversions import com.hotels.styx.{DefaultStyxConfiguration, StyxClientSupplier, StyxProxySpec} -import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled._ import io.netty.channel.ChannelFutureListener.CLOSE import io.netty.channel.ChannelHandlerContext @@ -41,8 +40,8 @@ import io.netty.handler.codec.http._ import org.scalatest._ import org.scalatest.concurrent.Eventually import rx.observers.TestSubscriber -import com.hotels.styx.api.StyxInternalObservables.toRxObservable import com.hotels.styx.api.exceptions.ResponseTimeoutException +import rx.RxReactiveStreams.toObservable import scala.compat.java8.StreamConverters._ import scala.concurrent.duration._ @@ -117,7 +116,7 @@ class OriginClosesConnectionSpec extends FunSuite .build val responseSubscriber = new TestSubscriber[HttpResponse]() - val contentSubscriber = new TestSubscriber[ByteBuf](1) + val contentSubscriber = new TestSubscriber[Buffer](1) val startTime = System.currentTimeMillis() val responseObservable = styxClient.sendRequest( @@ -127,7 +126,7 @@ class OriginClosesConnectionSpec extends FunSuite .toStreamingRequest) responseObservable - .doOnNext((t: HttpResponse) => toRxObservable(t.body()).subscribe(contentSubscriber)) + .doOnNext((t: HttpResponse) => toObservable(t.body()).subscribe(contentSubscriber)) .subscribe(responseSubscriber) responseSubscriber.awaitTerminalEvent() diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AggregatingPluginContentOverflowSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AggregatingPluginContentOverflowSpec.scala index 4b89bb29ca..61c6b2330d 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AggregatingPluginContentOverflowSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AggregatingPluginContentOverflowSpec.scala @@ -19,13 +19,10 @@ import java.nio.charset.StandardCharsets.UTF_8 import com.hotels.styx.MockServer.responseSupplier import com.hotels.styx.api.FullHttpRequest.get -import com.hotels.styx.api.{HttpResponse, StyxInternalObservables} -import com.hotels.styx.api.HttpResponse._ +import com.hotels.styx.api._ import com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY -import com.hotels.styx.common.FreePorts._ import com.hotels.styx.support.configuration.{HttpBackend, Origins, StyxConfig} import com.hotels.styx.{MockServer, StyxProxySpec} -import io.netty.buffer.{ByteBuf, Unpooled} import com.hotels.styx.api.HttpResponseStatus._ import org.scalatest.FunSpec import org.scalatest.concurrent.Eventually @@ -33,7 +30,7 @@ import rx.Observable import rx.lang.scala.JavaConversions._ import scala.concurrent.duration._ -import com.hotels.styx.api.StyxInternalObservables.fromRxObservable +import rx.RxReactiveStreams.toPublisher class AggregatingPluginContentOverflowSpec extends FunSpec with StyxProxySpec @@ -67,17 +64,16 @@ class AggregatingPluginContentOverflowSpec extends FunSpec mockServer.stub("/body", responseSupplier( () => { - HttpResponse.response(OK).body( - StyxInternalObservables.fromRxObservable(toJavaObservable( - delay(500.millis, - Seq( - buf("a" * 1000), - buf("b" * 1000), - buf("c" * 1000), - buf("d" * 1000), - buf("e" * 1000), - buf("f" * 1000)))) - .asInstanceOf[Observable[ByteBuf]])).build() + HttpResponse.response(OK).body(new ByteStream(toPublisher(toJavaObservable( + delay(500.millis, + Seq( + buf("a" * 1000), + buf("b" * 1000), + buf("c" * 1000), + buf("d" * 1000), + buf("e" * 1000), + buf("f" * 1000)))) + .asInstanceOf[Observable[Buffer]]))).build() })) val request = get(styxServer.routerURL("/body")) @@ -95,11 +91,11 @@ class AggregatingPluginContentOverflowSpec extends FunSpec } } - def buf(string: String): ByteBuf = Unpooled.copiedBuffer(string, UTF_8) + def buf(string: String): Buffer = new Buffer(string, UTF_8) import rx.lang.scala.Observable - def delay(time: Duration, buffers: Seq[ByteBuf]) = { + def delay(time: Duration, buffers: Seq[Buffer]) = { Observable.interval(time) .zip(Observable.from(buffers)) .map { case (i, buf) => buf } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AggregatingPluginSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AggregatingPluginSpec.scala index 08905db59c..a2ebcc82e2 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AggregatingPluginSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AggregatingPluginSpec.scala @@ -18,15 +18,16 @@ package com.hotels.styx.plugins import java.nio.charset.StandardCharsets.UTF_8 import com.hotels.styx.MockServer.responseSupplier +import com.hotels.styx.api.{Buffer, ByteStream} import com.hotels.styx.api.FullHttpRequest.get import com.hotels.styx.api.HttpResponse.response -import com.hotels.styx.api.StyxObservable import com.hotels.styx.api.HttpResponseStatus.OK import com.hotels.styx.support.configuration.{HttpBackend, Origins, StyxConfig} import com.hotels.styx.{MockServer, StyxProxySpec} -import io.netty.buffer.{ByteBuf, Unpooled} import org.scalatest.FunSpec import org.scalatest.concurrent.Eventually +import rx.RxReactiveStreams.toPublisher +import rx.Observable import scala.collection.JavaConverters._ import scala.concurrent.duration._ @@ -65,7 +66,7 @@ class AggregatingPluginSpec extends FunSpec it("Gets response from aggregating plugin (with body)") { mockServer.stub("/body", responseSupplier( - () => response(OK).body(StyxObservable.from(Seq(chunk("a"), chunk("b"), chunk("c"), chunk("d"), chunk("e")).asJava)).build() + () => response(OK).body(new ByteStream(toPublisher(Observable.from(Seq(chunk("a"), chunk("b"), chunk("c"), chunk("d"), chunk("e")).asJava)))).build() )) val request = get(styxServer.routerURL("/body")).build() @@ -78,10 +79,10 @@ class AggregatingPluginSpec extends FunSpec } } - def chunk(from: String): ByteBuf = buf(chunkString(from)) + def chunk(from: String): Buffer = buf(chunkString(from)) def chunkString(from: String): String = from * 500 - def buf(string: String): ByteBuf = Unpooled.copiedBuffer(string, UTF_8) + def buf(string: String): Buffer = new Buffer(string, UTF_8) } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncRequestContentSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncRequestContentSpec.scala index c9d4861d49..0155903004 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncRequestContentSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncRequestContentSpec.scala @@ -21,15 +21,14 @@ import com.github.tomakehurst.wiremock.client.WireMock._ import com.hotels.styx._ import com.hotels.styx.api.HttpInterceptor.Chain import com.hotels.styx.api.FullHttpRequest.get -import com.hotels.styx.api.StyxInternalObservables.{fromRxObservable, toRxObservable} -import com.hotels.styx.api.{HttpRequest, HttpResponse, StyxObservable} +import com.hotels.styx.api.{ByteStream, _} import com.hotels.styx.support.backends.FakeHttpServer import com.hotels.styx.support.configuration.{HttpBackend, Origins, StyxConfig} import com.hotels.styx.support.server.UrlMatchingStrategies._ -import io.netty.buffer.ByteBuf import io.netty.handler.codec.http.HttpHeaders.Names._ import io.netty.handler.codec.http.HttpHeaders.Values._ import org.scalatest.{BeforeAndAfterAll, FunSpec} +import rx.RxReactiveStreams.{toObservable, toPublisher} import scala.concurrent.duration._ import scala.compat.java8.FutureConverters.CompletionStageOps @@ -87,8 +86,8 @@ import scala.compat.java8.FunctionConverters.asJavaFunction class AsyncRequestContentDelayPlugin extends PluginAdapter { override def intercept(request: HttpRequest, chain: Chain): StyxObservable[HttpResponse] = { - val contentTransformation: rx.Observable[ByteBuf] = - toRxObservable(request.body()) + val contentTransformation: rx.Observable[Buffer] = + toObservable(request.body()) .observeOn(ComputationScheduler()) .flatMap(byteBuf => { Thread.sleep(1000) @@ -97,7 +96,7 @@ class AsyncRequestContentDelayPlugin extends PluginAdapter { // This was split apart as it no longer compiles without the type annotation StyxObservable[HttpRequest] val mapped: StyxObservable[HttpRequest] = StyxObservable.of(request) - .map(asJavaFunction((request: HttpRequest) => request.newBuilder().body(fromRxObservable(contentTransformation)).build())) + .map(asJavaFunction((request: HttpRequest) => request.newBuilder().body(new ByteStream(toPublisher(contentTransformation))).build())) mapped .flatMap(asJavaFunction((request: HttpRequest) => chain.proceed(request))) diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncResponseContentSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncResponseContentSpec.scala index 1897cae584..a7f1b2f4eb 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncResponseContentSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/plugins/AsyncResponseContentSpec.scala @@ -17,22 +17,20 @@ package com.hotels.styx.plugins import java.nio.charset.StandardCharsets.UTF_8 +import _root_.io.netty.handler.codec.http.HttpHeaders.Names._ +import _root_.io.netty.handler.codec.http.HttpHeaders.Values._ import com.github.tomakehurst.wiremock.client.WireMock._ -import com.hotels.styx.api.HttpInterceptor.Chain import com.hotels.styx.api.FullHttpRequest.get +import com.hotels.styx.api.HttpInterceptor.Chain import com.hotels.styx.api._ import com.hotels.styx.support._ -import com.hotels.styx.support.api.BlockingObservables.waitForResponse import com.hotels.styx.support.backends.FakeHttpServer import com.hotels.styx.support.configuration.{HttpBackend, Origins, StyxConfig} import com.hotels.styx.support.server.UrlMatchingStrategies._ import com.hotels.styx.{PluginAdapter, StyxClientSupplier, StyxProxySpec} -import _root_.io.netty.buffer.ByteBuf -import _root_.io.netty.handler.codec.http.HttpHeaders.Names._ -import _root_.io.netty.handler.codec.http.HttpHeaders.Values._ -import com.hotels.styx.api.StyxInternalObservables.{fromRxObservable, toRxObservable} import org.scalatest.FunSpec import rx.Observable +import rx.RxReactiveStreams.{toObservable, toPublisher} import rx.schedulers.Schedulers import scala.concurrent.duration._ @@ -88,14 +86,14 @@ class AsyncDelayPlugin extends PluginAdapter { chain.proceed(request) .flatMap(asJavaFunction((response: HttpResponse) => { - val transformedContent: Observable[ByteBuf] = toRxObservable(response.body()) + val transformedContent: Observable[Buffer] = toObservable(response.body()) .observeOn(Schedulers.computation()) - .flatMap((byteBuf: ByteBuf) => { + .flatMap((buffer: Buffer) => { Thread.sleep(1000) - Observable.just(byteBuf) + Observable.just(buffer) }) - StyxObservable.of(response.newBuilder().body(fromRxObservable(transformedContent)).build()) + StyxObservable.of(response.newBuilder().body(new ByteStream(toPublisher(transformedContent))).build()) })) } } diff --git a/system-tests/e2e-testsupport/src/main/java/com/hotels/styx/utils/handlers/ContentDigestHandler.java b/system-tests/e2e-testsupport/src/main/java/com/hotels/styx/utils/handlers/ContentDigestHandler.java index f1bca63d4b..95eead4219 100644 --- a/system-tests/e2e-testsupport/src/main/java/com/hotels/styx/utils/handlers/ContentDigestHandler.java +++ b/system-tests/e2e-testsupport/src/main/java/com/hotels/styx/utils/handlers/ContentDigestHandler.java @@ -15,7 +15,9 @@ */ package com.hotels.styx.utils.handlers; +import com.hotels.styx.api.FullHttpRequest; import com.hotels.styx.api.FullHttpResponse; +import com.hotels.styx.api.HttpRequest; import com.hotels.styx.api.HttpResponse; import com.hotels.styx.api.extension.Origin; import com.hotels.styx.common.http.handler.BaseHttpHandler; @@ -25,10 +27,9 @@ import static com.google.common.net.HttpHeaders.CONTENT_TYPE; import static com.google.common.net.MediaType.HTML_UTF_8; import static com.hotels.styx.api.HttpResponseStatus.OK; -import static com.hotels.styx.support.api.HttpMessageBodies.bodyAsString; +import static com.hotels.styx.common.StyxFutures.await; import static java.lang.String.format; import static java.util.UUID.randomUUID; -import com.hotels.styx.api.HttpRequest; public class ContentDigestHandler extends BaseHttpHandler { private final Origin origin; @@ -39,12 +40,12 @@ public ContentDigestHandler(Origin origin) { @Override protected HttpResponse doHandle(HttpRequest request) { - String requestBody = bodyAsString(request); + FullHttpRequest fullRequest = await(request.toFullRequest(0x100000).asCompletableFuture()); String responseBody = format("Response From %s - %s, received content digest: %s", origin.hostAndPortString(), randomUUID(), - requestBody.hashCode()); + fullRequest.bodyAs(UTF_8).hashCode()); return FullHttpResponse.response(OK) .header(CONTENT_TYPE, HTML_UTF_8.toString())