Skip to content

Commit

Permalink
A ByteStream abstraction to represent streaming content. (#298)
Browse files Browse the repository at this point in the history
* Add a Buffer class to encapsulate Netty ByteBuf.
* Add a ByteStream class to represent HTTP content streams.
* Add a Reactor-Core dependency, in order to use Flux to shore up ByteStream class.
* Remove HttpMessageBodies class.
  • Loading branch information
mikkokar authored Oct 11, 2018
1 parent 2271ed1 commit 5d9b79c
Show file tree
Hide file tree
Showing 45 changed files with 1,091 additions and 753 deletions.
22 changes: 22 additions & 0 deletions components/api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@
<artifactId>rxjava</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>

<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
Expand Down
73 changes: 73 additions & 0 deletions components/api/src/main/java/com/hotels/styx/api/Buffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.api;

import io.netty.buffer.ByteBuf;

import java.nio.charset.Charset;

import static io.netty.buffer.Unpooled.copiedBuffer;
import static java.util.Objects.requireNonNull;

/**
* A byte buffer.
*
*/
public final class Buffer {
private final ByteBuf delegate;

Buffer(ByteBuf byteBuf) {
this.delegate = requireNonNull(byteBuf);
}

/**
* Creates a new Buffer from {@link String} content with specified character encoding.
*
* @param content content
* @param charset desired character encoding
*/
public Buffer(String content, Charset charset) {
this(copiedBuffer(content, charset));
}

/**
* Returns a size of the Buffer in bytes.
* @return a size in bytes
*/
public int size() {
return delegate.readableBytes();
}

/**
* Returns buffer content as array of bytes.
*
* @return a byte array
*/
public byte[] content() {
byte[] bytes = new byte[delegate.readableBytes()];
delegate.getBytes(delegate.readerIndex(), bytes);
return bytes;
}

/**
* The underlying Netty ByteBuf.
*
* @return a Netty ByteBuf object
*/
ByteBuf delegate() {
return delegate;
}
}
139 changes: 139 additions & 0 deletions components/api/src/main/java/com/hotels/styx/api/ByteStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.api;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

/**
* A stream of Styx byte {@link Buffer} objects constituting a HTTP message body.
*
* This {@code ByteStream} class implements a reactive streams {@link Publisher} interface,
* therefore being interoperable with other conforming libraries such as Reactor and
* Rx Java 2.0.
*
* The class provides a set of operations to transform and inspect the byte stream.
*
* The class also provides methods for consuming the stream.
**
*/
public class ByteStream implements Publisher<Buffer> {
private final Publisher<Buffer> stream;

/**
* Create a new {@code ByteStream} from a reactive streams {@link Publisher}.
*
* @param stream a reactive streams {@link Publisher}
*/
public ByteStream(Publisher<Buffer> stream) {
this.stream = requireNonNull(stream);
}

/**
* Transform the stream by performing a mapping operation on each {@link Buffer} object.
*
* The mapping operation automatically maintains the @{link Buffer} reference counts as
* follows:
*
* <ul>
* <li> When the mapping function returns a new {@link Buffer} instance, the reference count for
* the old one is automatically decremented.</li>
* <li> When the mapping function modifies the {@link Buffer} in place, returning the same instance
* back, the reference count is unchanged.</li>
* </ul>
*
* @param mapping a mapping function
*
* @return a new, mapped {@code ByteStream} object
*/
public ByteStream map(Function<Buffer, Buffer> mapping) {
return new ByteStream(Flux.from(stream).map(releaseOldBuffers(mapping)));
}

private static Function<Buffer, Buffer> releaseOldBuffers(Function<Buffer, Buffer> mapping) {
return buffer -> {
Buffer buffer2 = mapping.apply(buffer);
if (buffer != buffer2) {
buffer.delegate().release();
}
return buffer2;
};
}

/**
* Transform the stream by dropping all {@link Buffer} objects.
*
* The {@code drop} returns a new {@code ByteStream} object with all upstream
* buffers removed. The {@code drop} automatically decrements the reference
* counts for each dropped {@link Buffer}.
*
* @return an empty {@link ByteStream}
*/
public ByteStream drop() {
return new ByteStream(Flux.from(stream)
.doOnNext(buffer -> buffer.delegate().release())
.filter(buffer -> false));
}

/**
* Run a provided action at the end of the byte stream.
*
* The provided action must accept an {@code Optional<Throwable>} argument,
* which is be set to {@code Optional.empty} if this stream finished successfully,
* or an {@code Optional.of(cause)} when this stream terminated with an error.
*
* @param action an action function
*
* @return an unmodified {@code ByteStream} with an action function attached
*/
public ByteStream doOnEnd(Consumer<Optional<Throwable>> action) {
return new ByteStream(Flux.from(this.stream)
.doOnError(cause -> action.accept(Optional.of(cause)))
.doOnComplete(() -> action.accept(Optional.empty()))
);
}

/**
* Consumes the stream by collecting it into an aggregate {@link Buffer} object.
*
* The aggregate {@link Buffer} object must be released after use.
*
* @param maxContentBytes maximum size for the aggregated buffer
* @return a future of aggregated buffer
*/
public CompletableFuture<Buffer> aggregate(int maxContentBytes) {
return new ByteStreamAggregator(this.stream, maxContentBytes)
.apply();
}

/**
* Consume the {@link ByteStream} by providing a reactive streams {@link Subscriber}.
*
* @param subscriber a reactive streams {@link Subscriber}
*/
@Override
public void subscribe(Subscriber<? super Buffer> subscriber) {
stream.subscribe(subscriber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx.api;

import io.netty.buffer.CompositeByteBuf;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.netty.buffer.Unpooled.compositeBuffer;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

class ByteStreamAggregator implements Subscriber<Buffer> {
private final Publisher<Buffer> upstream;
private final int maxSize;
private final CompletableFuture<Buffer> future = new CompletableFuture<>();
private final AtomicBoolean active = new AtomicBoolean();
private final CompositeByteBuf aggregated = compositeBuffer();
private Subscription subscription;

ByteStreamAggregator(Publisher<Buffer> upstream, int maxSize) {
this.upstream = requireNonNull(upstream);
this.maxSize = maxSize;
}

public CompletableFuture<Buffer> apply() {
if (active.compareAndSet(false, true)) {
this.upstream.subscribe(this);
return future;
} else {
throw new IllegalStateException("ByteStreamAggregator may only be started once.");
}
}

@Override
public void onSubscribe(Subscription subscription) {
if (this.subscription == null) {
this.subscription = subscription;
this.subscription.request(Long.MAX_VALUE);
} else {
subscription.cancel();
throw new IllegalStateException("ByteStreamAggregator supports only one Producer instance.");
}
}

@Override
public void onNext(Buffer part) {
long newSize = aggregated.readableBytes() + part.size();

if (newSize > maxSize) {
part.delegate().release();
aggregated.release();
subscription.cancel();
this.future.completeExceptionally(
new ContentOverflowException(format("Maximum content size exceeded. Maximum size allowed is %d bytes.", maxSize)));
} else {
aggregated.addComponent(part.delegate());
aggregated.writerIndex(aggregated.writerIndex() + part.size());
}
}

@Override
public void onError(Throwable cause) {
aggregated.release();
subscription.cancel();
future.completeExceptionally(cause);
}

@Override
public void onComplete() {
future.complete(new Buffer(aggregated));
}

}
Loading

0 comments on commit 5d9b79c

Please sign in to comment.