From 023bb54ff0855267afc7cebdc7c6e7f5a1b83fac Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Wed, 22 Jun 2016 20:57:41 -0700 Subject: [PATCH] Using RxNetty for tcp transport (#101) * Using RxNetty for tcp transport Current TCP transport implementation lacks a few critical features around insights and network flow control. Since RxNetty already has these features, it makes sense to use it. * Merge branch 'master' of https://github.com/ReactiveSocket/reactivesocket-java into rxnetty # Conflicts: # reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java # reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java # reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java * Review comments Also updated to rxnetty-0.5.2-rc.3 --- reactivesocket-examples/build.gradle | 1 + .../reactivesocket/examples/EchoClient.java | 41 +++-- .../src/main/resources/log4j.properties | 20 +++ .../java/io/reactivesocket/test/TestUtil.java | 7 + reactivesocket-transport-tcp/build.gradle | 3 +- .../transport/tcp/EchoServer.java | 38 ---- .../transport/tcp/EchoServerHandler.java | 68 ------- .../transport/tcp/HttpServerHandler.java | 22 --- .../transport/tcp/ObserverSubscriber.java | 46 +++++ .../tcp/ReactiveSocketFrameCodec.java | 63 +++++++ .../tcp/ReactiveSocketLengthCodec.java | 28 +++ .../transport/tcp/TcpDuplexConnection.java | 93 ++++++++++ .../tcp/client/ClientTcpDuplexConnection.java | 166 ------------------ .../client/ReactiveSocketClientHandler.java | 60 ------- .../client/TcpReactiveSocketConnector.java | 135 ++++++++------ .../server/ReactiveSocketServerHandler.java | 97 ---------- .../tcp/server/ServerTcpDuplexConnection.java | 123 ------------- .../tcp/server/TcpReactiveSocketServer.java | 125 +++++++++++++ .../transport/tcp/ClientServerTest.java | 155 +++------------- .../transport/tcp/ClientSetupRule.java | 78 ++++++++ .../transport/tcp/PayloadImpl.java | 56 ++++++ .../io/reactivesocket/transport/tcp/Ping.java | 38 ++-- .../io/reactivesocket/transport/tcp/Pong.java | 128 +++----------- .../transport/tcp/TestRequestHandler.java | 61 +++++++ .../build.gradle | 1 + .../server/ReactiveSocketServerHandler.java | 6 - 26 files changed, 750 insertions(+), 909 deletions(-) create mode 100644 reactivesocket-examples/src/main/resources/log4j.properties delete mode 100644 reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServer.java delete mode 100644 reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java delete mode 100644 reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/HttpServerHandler.java create mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java create mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ReactiveSocketFrameCodec.java create mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ReactiveSocketLengthCodec.java create mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java delete mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java delete mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java delete mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java delete mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ServerTcpDuplexConnection.java create mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpReactiveSocketServer.java create mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientSetupRule.java create mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/PayloadImpl.java create mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java diff --git a/reactivesocket-examples/build.gradle b/reactivesocket-examples/build.gradle index 65cc300f2..824b5ea2a 100644 --- a/reactivesocket-examples/build.gradle +++ b/reactivesocket-examples/build.gradle @@ -6,4 +6,5 @@ dependencies { compile project(':reactivesocket-transport-tcp') compile project(':reactivesocket-test') + runtime group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.21' } diff --git a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java index 47b9628ea..9d53cdc12 100644 --- a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java +++ b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java @@ -15,46 +15,53 @@ */ package io.reactivesocket.examples; -import io.netty.channel.nio.NioEventLoopGroup; +import io.reactivesocket.ConnectionSetupHandler; import io.reactivesocket.ConnectionSetupPayload; import io.reactivesocket.Payload; import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.RequestHandler; import io.reactivesocket.client.ClientBuilder; +import io.reactivesocket.test.TestUtil; import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; import io.reactivesocket.util.Unsafe; -import io.reactivesocket.test.TestUtil; -import org.reactivestreams.Publisher; +import rx.Observable; +import rx.RxReactiveStreams; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; public final class EchoClient { - private static Publisher> source(SocketAddress sa) { - return sub -> sub.onNext(Collections.singletonList(sa)); - } - public static void main(String... args) throws Exception { - InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8888); + + ConnectionSetupHandler setupHandler = (setupPayload, reactiveSocket) -> { + return new RequestHandler.Builder() + .withRequestResponse( + payload -> RxReactiveStreams.toPublisher(Observable.just(payload))) + .build(); + }; + + SocketAddress serverAddress = TcpReactiveSocketServer.create() + .start(setupHandler) + .getServerAddress(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); - TcpReactiveSocketConnector tcp = - new TcpReactiveSocketConnector(new NioEventLoopGroup(8), setupPayload, System.err::println); + TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace); ReactiveSocket client = ClientBuilder.instance() - .withSource(source(address)) + .withSource(RxReactiveStreams.toPublisher(Observable.just(Collections.singletonList(serverAddress)))) .withConnector(tcp) .build(); Unsafe.awaitAvailability(client); Payload request = TestUtil.utf8EncodedPayload("Hello", "META"); - Payload response = Unsafe.blockingSingleWait(client.requestResponse(request), 1, TimeUnit.SECONDS); - - System.out.println(response); + RxReactiveStreams.toObservable(client.requestResponse(request)) + .map(TestUtil::dataAsString) + .toBlocking() + .forEach(System.out::println); } } diff --git a/reactivesocket-examples/src/main/resources/log4j.properties b/reactivesocket-examples/src/main/resources/log4j.properties new file mode 100644 index 000000000..469efe201 --- /dev/null +++ b/reactivesocket-examples/src/main/resources/log4j.properties @@ -0,0 +1,20 @@ +# +# Copyright 2015 Netflix, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%c %d{dd MMM yyyy HH:mm:ss,SSS} %5p [%t] (%F:%L) - %m%n \ No newline at end of file diff --git a/reactivesocket-test/src/main/java/io/reactivesocket/test/TestUtil.java b/reactivesocket-test/src/main/java/io/reactivesocket/test/TestUtil.java index 6100102af..6c734dde0 100644 --- a/reactivesocket-test/src/main/java/io/reactivesocket/test/TestUtil.java +++ b/reactivesocket-test/src/main/java/io/reactivesocket/test/TestUtil.java @@ -63,6 +63,13 @@ public static Payload utf8EncodedPayload(final String data, final String metadat return new PayloadImpl(data, metadata); } + public static String dataAsString(Payload payload) { + ByteBuffer data = payload.getData(); + byte[] dst = new byte[data.remaining()]; + data.get(dst); + return new String(dst); + } + public static String byteToString(ByteBuffer byteBuffer) { byteBuffer = byteBuffer.duplicate(); diff --git a/reactivesocket-transport-tcp/build.gradle b/reactivesocket-transport-tcp/build.gradle index 5bea1d904..75d9c0356 100644 --- a/reactivesocket-transport-tcp/build.gradle +++ b/reactivesocket-transport-tcp/build.gradle @@ -1,7 +1,6 @@ dependencies { compile project(':reactivesocket-core') - compile 'io.netty:netty-handler:4.1.0.CR7' - compile 'io.netty:netty-codec-http:4.1.0.CR7' + compile 'io.reactivex:rxnetty-tcp:0.5.2-rc.3' testCompile project(':reactivesocket-test') } diff --git a/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServer.java b/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServer.java deleted file mode 100644 index b3302f0e1..000000000 --- a/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServer.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.reactivesocket.transport.tcp; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; - -public class EchoServer { - public static void main(String... args) throws Exception { - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - EventLoopGroup workerGroup = new NioEventLoopGroup(); - - try { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new EchoServerHandler()); - } - }); - - Channel localhost = b.bind("0.0.0.0", 8025).sync().channel(); - localhost.closeFuture().sync(); - } finally { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - } - } -} \ No newline at end of file diff --git a/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java b/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java deleted file mode 100644 index c7101a80a..000000000 --- a/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java +++ /dev/null @@ -1,68 +0,0 @@ -package io.reactivesocket.transport.tcp; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpServerCodec; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.transport.tcp.server.ReactiveSocketServerHandler; - -import java.util.List; - -public class EchoServerHandler extends ByteToMessageDecoder { - private static SimpleChannelInboundHandler httpHandler = new HttpServerHandler(); - - private static RequestHandler requestHandler = new RequestHandler.Builder().withRequestResponse(payload -> s -> { - s.onNext(payload); - s.onComplete(); - }).build(); - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - // Will use the first five bytes to detect a protocol. - if (in.readableBytes() < 5) { - return; - } - - final int magic1 = in.getUnsignedByte(in.readerIndex()); - final int magic2 = in.getUnsignedByte(in.readerIndex() + 1); - if (isHttp(magic1, magic2)) { - switchToHttp(ctx); - } else { - switchToReactiveSocket(ctx); - } - } - - private static boolean isHttp(int magic1, int magic2) { - return - magic1 == 'G' && magic2 == 'E' || // GET - magic1 == 'P' && magic2 == 'O' || // POST - magic1 == 'P' && magic2 == 'U' || // PUT - magic1 == 'H' && magic2 == 'E' || // HEAD - magic1 == 'O' && magic2 == 'P' || // OPTIONS - magic1 == 'P' && magic2 == 'A' || // PATCH - magic1 == 'D' && magic2 == 'E' || // DELETE - magic1 == 'T' && magic2 == 'R' || // TRACE - magic1 == 'C' && magic2 == 'O'; // CONNECT - } - - private void switchToHttp(ChannelHandlerContext ctx) { - ChannelPipeline p = ctx.pipeline(); - p.addLast(new HttpServerCodec()); - p.addLast(new HttpObjectAggregator(256 * 1024)); - p.addLast(httpHandler); - p.remove(this); - } - - private void switchToReactiveSocket(ChannelHandlerContext ctx) { - ChannelPipeline p = ctx.pipeline(); - ReactiveSocketServerHandler reactiveSocketHandler = - ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler); - p.addLast(reactiveSocketHandler); - p.remove(this); - } -} diff --git a/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/HttpServerHandler.java b/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/HttpServerHandler.java deleted file mode 100644 index 6dea07b2c..000000000 --- a/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/HttpServerHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.reactivesocket.transport.tcp; - -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.*; - -@ChannelHandler.Sharable -public class HttpServerHandler extends SimpleChannelInboundHandler { - @Override - protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, request.content().retain()); - HttpUtil.setContentLength(response, response.content().readableBytes()); - if (HttpUtil.isKeepAlive(request)) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - ctx.writeAndFlush(response); - } else { - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); - } - } -} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java new file mode 100644 index 000000000..c4872e734 --- /dev/null +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.reactivesocket.transport.tcp; + +import io.reactivesocket.Frame; +import io.reactivesocket.rx.Observer; +import rx.Subscriber; + +public class ObserverSubscriber extends Subscriber { + + private final Observer o; + + public ObserverSubscriber(Observer o) { + this.o = o; + } + + @Override + public void onCompleted() { + o.onComplete(); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onNext(Frame frame) { + o.onNext(frame); + } +} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ReactiveSocketFrameCodec.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ReactiveSocketFrameCodec.java new file mode 100644 index 000000000..ad40149a6 --- /dev/null +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ReactiveSocketFrameCodec.java @@ -0,0 +1,63 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.reactivesocket.transport.tcp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.util.ReferenceCountUtil; +import io.reactivesocket.Frame; + +import java.nio.ByteBuffer; + +/** + * A Codec that aids reading and writing of ReactiveSocket {@link Frame}s. + */ +public class ReactiveSocketFrameCodec extends ChannelDuplexHandler { + + private final MutableDirectByteBuf buffer = new MutableDirectByteBuf(Unpooled.buffer(0)); + private final Frame frame = Frame.allocate(buffer); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + try { + buffer.wrap((ByteBuf) msg); + frame.wrap(buffer, 0); + ctx.fireChannelRead(frame); + } finally { + ReferenceCountUtil.release(msg); + } + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof Frame) { + ByteBuffer src = ((Frame)msg).getByteBuffer(); + ByteBuf toWrite = ctx.alloc().buffer(src.remaining()).writeBytes(src); + ctx.write(toWrite, promise); + } else { + super.write(ctx, msg, promise); + } + } +} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ReactiveSocketLengthCodec.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ReactiveSocketLengthCodec.java new file mode 100644 index 000000000..f7e3d5c29 --- /dev/null +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ReactiveSocketLengthCodec.java @@ -0,0 +1,28 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.reactivesocket.transport.tcp; + +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.agrona.BitUtil; + +public class ReactiveSocketLengthCodec extends LengthFieldBasedFrameDecoder { + + public ReactiveSocketLengthCodec() { + super(Integer.MAX_VALUE, 0, BitUtil.SIZE_OF_INT, -1 * BitUtil.SIZE_OF_INT, 0); + } +} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java new file mode 100644 index 000000000..fd5bdf14e --- /dev/null +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java @@ -0,0 +1,93 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.reactivesocket.transport.tcp; + +import io.reactivesocket.DuplexConnection; +import io.reactivesocket.Frame; +import io.reactivesocket.internal.rx.BooleanDisposable; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observable; +import io.reactivesocket.rx.Observer; +import io.reactivex.netty.channel.Connection; +import org.reactivestreams.Publisher; +import rx.RxReactiveStreams; +import rx.Subscriber; + +import java.io.IOException; + +public class TcpDuplexConnection implements DuplexConnection { + + private final Connection connection; + private final rx.Observable input; + + public TcpDuplexConnection(Connection connection) { + this.connection = connection; + input = connection.getInput().publish().refCount(); + } + + @Override + public final Observable getInput() { + return new Observable() { + @Override + public void subscribe(Observer o) { + Subscriber subscriber = new ObserverSubscriber(o); + o.onSubscribe(new BooleanDisposable(new Runnable() { + @Override + public void run() { + subscriber.unsubscribe(); + } + })); + input.unsafeSubscribe(subscriber); + } + }; + } + + @Override + public void addOutput(Publisher o, Completable callback) { + connection.writeAndFlushOnEach(RxReactiveStreams.toObservable(o)) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + callback.success(); + } + + @Override + public void onError(Throwable e) { + callback.error(e); + } + + @Override + public void onNext(Void aVoid) { + // No Op. + } + }); + } + + @Override + public double availability() { + return connection.unsafeNettyChannel().isActive() ? 1.0 : 0.0; + } + + @Override + public void close() throws IOException { + connection.closeNow(); + } + + public String toString() { + return connection.unsafeNettyChannel().toString(); + } +} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java deleted file mode 100644 index d82230552..000000000 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.transport.tcp.client; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.*; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.exceptions.TransportException; -import io.reactivesocket.internal.rx.EmptySubscription; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.agrona.BitUtil; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.channels.ClosedChannelException; -import java.util.concurrent.CopyOnWriteArrayList; - -public class ClientTcpDuplexConnection implements DuplexConnection { - private final Channel channel; - private final CopyOnWriteArrayList> subjects; - - private ClientTcpDuplexConnection(Channel channel, CopyOnWriteArrayList> subjects) { - this.subjects = subjects; - this.channel = channel; - } - - public static Publisher create(SocketAddress address, EventLoopGroup eventLoopGroup) { - return s -> { - CopyOnWriteArrayList> subjects = new CopyOnWriteArrayList<>(); - ReactiveSocketClientHandler clientHandler = new ReactiveSocketClientHandler(subjects); - Bootstrap bootstrap = new Bootstrap(); - ChannelFuture connect = bootstrap - .group(eventLoopGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.AUTO_READ, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline p = ch.pipeline(); - p.addLast( - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE >> 1, 0, BitUtil.SIZE_OF_INT, -1 * BitUtil.SIZE_OF_INT, 0), - clientHandler - ); - } - }).connect(address); - - connect.addListener(connectFuture -> { - s.onSubscribe(EmptySubscription.INSTANCE); - if (connectFuture.isSuccess()) { - Channel ch = connect.channel(); - s.onNext(new ClientTcpDuplexConnection(ch, subjects)); - s.onComplete(); - } else { - s.onError(connectFuture.cause()); - } - }); - }; - } - - @Override - public final Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o.subscribe(new Subscriber() { - private Subscription subscription; - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - // TODO: wire back pressure - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - ByteBuf byteBuf = Unpooled.wrappedBuffer(frame.getByteBuffer()); - ChannelFuture channelFuture = channel.writeAndFlush(byteBuf); - channelFuture.addListener(future -> { - Throwable cause = future.cause(); - if (cause != null) { - if (cause instanceof ClosedChannelException) { - onError(new TransportException(cause)); - } else { - onError(cause); - } - } - }); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - subscription.cancel(); - } - - @Override - public void onComplete() { - callback.success(); - subscription.cancel(); - } - }); - } - - @Override - public double availability() { - return channel.isOpen() ? 1.0 : 0.0; - } - - @Override - public void close() throws IOException { - channel.close(); - } - - public String toString() { - if (channel == null) { - return "ClientTcpDuplexConnection(channel=null)"; - } - - return "ClientTcpDuplexConnection(channel=[" + - "remoteAddress=" + channel.remoteAddress() + "," + - "isActive=" + channel.isActive() + "," + - "isOpen=" + channel.isOpen() + "," + - "isRegistered=" + channel.isRegistered() + "," + - "isWritable=" + channel.isWritable() + "," + - "channelId=" + channel.id().asLongText() + - "])"; - - } -} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java deleted file mode 100644 index ac773d782..000000000 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.transport.tcp.client; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.reactivesocket.Frame; -import io.reactivesocket.transport.tcp.MutableDirectByteBuf; -import io.reactivesocket.rx.Observer; - -import java.util.concurrent.CopyOnWriteArrayList; - -@ChannelHandler.Sharable -public class ReactiveSocketClientHandler extends ChannelInboundHandlerAdapter { - - private final CopyOnWriteArrayList> subjects; - - public ReactiveSocketClientHandler(CopyOnWriteArrayList> subjects) { - this.subjects = subjects; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object content) { - ByteBuf byteBuf = (ByteBuf) content; - try { - MutableDirectByteBuf mutableDirectByteBuf = new MutableDirectByteBuf(byteBuf); - final Frame from = Frame.from(mutableDirectByteBuf, 0, mutableDirectByteBuf.capacity()); - subjects.forEach(o -> o.onNext(from)); - } finally { - byteBuf.release(); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // Close the connection when an exception is raised. - cause.printStackTrace(); - ctx.close(); - } -} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java index 576f71b76..641500774 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java @@ -1,81 +1,104 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package io.reactivesocket.transport.tcp.client; -import io.netty.channel.EventLoopGroup; -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; +import io.netty.buffer.ByteBuf; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.Frame; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketConnector; import io.reactivesocket.rx.Completable; +import io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec; +import io.reactivesocket.transport.tcp.ReactiveSocketLengthCodec; +import io.reactivesocket.transport.tcp.TcpDuplexConnection; +import io.reactivex.netty.channel.Connection; +import io.reactivex.netty.protocol.tcp.client.TcpClient; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import rx.RxReactiveStreams; +import rx.Single; +import rx.Single.OnSubscribe; +import rx.SingleSubscriber; +import rx.Subscriber; import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; +import java.util.function.Function; + +import static io.reactivesocket.DefaultReactiveSocket.fromClientConnection; -/** - * An implementation of {@link ReactiveSocketConnector} that creates Netty TCP ReactiveSockets. - */ public class TcpReactiveSocketConnector implements ReactiveSocketConnector { - private final ConnectionSetupPayload connectionSetupPayload; + + private final ConcurrentMap> socketFactories; + private final ConnectionSetupPayload setupPayload; private final Consumer errorStream; - private final EventLoopGroup eventLoopGroup; + private final Function> clientFactory; - public TcpReactiveSocketConnector(EventLoopGroup eventLoopGroup, ConnectionSetupPayload connectionSetupPayload, Consumer errorStream) { - this.connectionSetupPayload = connectionSetupPayload; + private TcpReactiveSocketConnector(ConnectionSetupPayload setupPayload, Consumer errorStream, + Function> clientFactory) { + this.setupPayload = setupPayload; this.errorStream = errorStream; - this.eventLoopGroup = eventLoopGroup; + this.clientFactory = clientFactory; + socketFactories = new ConcurrentHashMap<>(); } @Override public Publisher connect(SocketAddress address) { - Publisher connection - = ClientTcpDuplexConnection.create(address, eventLoopGroup); + return _connect(socketFactories.computeIfAbsent(address, socketAddress -> { + return clientFactory.apply(socketAddress) + .addChannelHandlerLast("length-codec", ReactiveSocketLengthCodec::new) + .addChannelHandlerLast("frame-codec", ReactiveSocketFrameCodec::new); + })); + } - return subscriber -> connection.subscribe(new Subscriber() { + private Publisher _connect(TcpClient client) { + Single r = Single.create(new OnSubscribe() { @Override - public void onSubscribe(Subscription s) { - subscriber.onSubscribe(s); - } + public void call(SingleSubscriber s) { + client.createConnectionRequest() + .toSingle() + .unsafeSubscribe(new Subscriber>() { + @Override + public void onCompleted() { + // Single contract does not allow complete without onNext and onNext here completes + // the outer subscriber + } - @Override - public void onNext(ClientTcpDuplexConnection connection) { - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection( - connection, connectionSetupPayload, errorStream); - reactiveSocket.start(new Completable() { - @Override - public void success() { - subscriber.onNext(reactiveSocket); - subscriber.onComplete(); - } + @Override + public void onError(Throwable e) { + s.onError(e); + } - @Override - public void error(Throwable e) { - subscriber.onError(e); - } - }); - } + @Override + public void onNext(Connection c) { + TcpDuplexConnection dc = new TcpDuplexConnection(c); + ReactiveSocket rs = fromClientConnection(dc, setupPayload, errorStream); + rs.start(new Completable() { + @Override + public void success() { + s.onSuccess(rs); + } - @Override - public void onError(Throwable t) { - subscriber.onError(t); + @Override + public void error(Throwable e) { + s.onError(e); + } + }); + } + }); } - - @Override - public void onComplete() {} }); + return RxReactiveStreams.toPublisher(r.toObservable()); + } + + public static TcpReactiveSocketConnector create(ConnectionSetupPayload setupPayload, + Consumer errorStream) { + return new TcpReactiveSocketConnector(setupPayload, errorStream, + socketAddress -> TcpClient.newClient(socketAddress)); + } + + public static TcpReactiveSocketConnector create(ConnectionSetupPayload setupPayload, + Consumer errorStream, + Function> clientFactory) { + return new TcpReactiveSocketConnector(setupPayload, errorStream, clientFactory); } } diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java deleted file mode 100644 index ec649990d..000000000 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.transport.tcp.server; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.reactivesocket.ConnectionSetupHandler; -import io.reactivesocket.DefaultReactiveSocket; -import io.reactivesocket.Frame; -import io.reactivesocket.LeaseGovernor; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.transport.tcp.MutableDirectByteBuf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.agrona.BitUtil.SIZE_OF_INT; - -public class ReactiveSocketServerHandler extends ChannelInboundHandlerAdapter { - private static final Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class); - private static final int MAX_FRAME_LENGTH = Integer.MAX_VALUE >> 1; - - private ConnectionSetupHandler setupHandler; - private LeaseGovernor leaseGovernor; - private ServerTcpDuplexConnection connection; - - protected ReactiveSocketServerHandler(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { - this.setupHandler = setupHandler; - this.leaseGovernor = leaseGovernor; - this.connection = null; - } - - public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler) { - return create(setupHandler, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR); - } - - public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { - return new ReactiveSocketServerHandler(setupHandler, leaseGovernor); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - ChannelPipeline cp = ctx.pipeline(); - if (cp.get(LengthFieldBasedFrameDecoder.class) == null) { - LengthFieldBasedFrameDecoder frameDecoder = - new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, SIZE_OF_INT, -1 * SIZE_OF_INT, 0); - ctx.pipeline() - .addBefore(ctx.name(), LengthFieldBasedFrameDecoder.class.getName(), frameDecoder); - } - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - connection = new ServerTcpDuplexConnection(ctx); - ReactiveSocket reactiveSocket = - DefaultReactiveSocket.fromServerConnection(connection, setupHandler, leaseGovernor, Throwable::printStackTrace); - // Note: No blocking code here (still it should be refactored) - reactiveSocket.startAndWait(); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf content = (ByteBuf) msg; - try { - MutableDirectByteBuf mutableDirectByteBuf = new MutableDirectByteBuf(content); - Frame from = Frame.from(mutableDirectByteBuf, 0, mutableDirectByteBuf.capacity()); - - if (connection != null) { - connection.getSubscribers().forEach(o -> o.onNext(from)); - } - } finally { - content.release(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - - logger.error("caught an unhandled exception", cause); - } -} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ServerTcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ServerTcpDuplexConnection.java deleted file mode 100644 index a271dc3e5..000000000 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ServerTcpDuplexConnection.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.transport.tcp.server; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -public class ServerTcpDuplexConnection implements DuplexConnection { - private final CopyOnWriteArrayList> subjects; - - private final ChannelHandlerContext ctx; - - public ServerTcpDuplexConnection(ChannelHandlerContext ctx) { - this.subjects = new CopyOnWriteArrayList<>(); - this.ctx = ctx; - } - - public List> getSubscribers() { - return subjects; - } - - @Override - public final Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - ByteBuffer data = frame.getByteBuffer(); - ByteBuf byteBuf = Unpooled.wrappedBuffer(data); - ChannelFuture channelFuture = ctx.writeAndFlush(byteBuf); - channelFuture.addListener(future -> { - Throwable cause = future.cause(); - if (cause != null) { - cause.printStackTrace(); - callback.error(cause); - } - }); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return ctx.channel().isOpen() ? 1.0 : 0.0; - } - - @Override - public void close() throws IOException { - - } - - public String toString() { - if (ctx ==null || ctx.channel() == null) { - return getClass().getName() + ":channel=null"; - } - - Channel channel = ctx.channel(); - return getClass().getName() + ":channel=[" + - "remoteAddress=" + channel.remoteAddress() + "," + - "isActive=" + channel.isActive() + "," + - "isOpen=" + channel.isOpen() + "," + - "isRegistered=" + channel.isRegistered() + "," + - "isWritable=" + channel.isWritable() + "," + - "channelId=" + channel.id().asLongText() + - "]"; - - } -} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpReactiveSocketServer.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpReactiveSocketServer.java new file mode 100644 index 000000000..5cffca631 --- /dev/null +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpReactiveSocketServer.java @@ -0,0 +1,125 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.reactivesocket.transport.tcp.server; + +import io.netty.buffer.ByteBuf; +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.DefaultReactiveSocket; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec; +import io.reactivesocket.transport.tcp.ReactiveSocketLengthCodec; +import io.reactivesocket.transport.tcp.TcpDuplexConnection; +import io.reactivex.netty.channel.Connection; +import io.reactivex.netty.protocol.tcp.server.ConnectionHandler; +import io.reactivex.netty.protocol.tcp.server.TcpServer; +import rx.Completable; +import rx.Completable.CompletableOnSubscribe; +import rx.Completable.CompletableSubscriber; +import rx.Observable; + +import java.net.SocketAddress; + +public class TcpReactiveSocketServer { + + private final TcpServer server; + + private TcpReactiveSocketServer(TcpServer server) { + this.server = server; + } + + public StartedServer start(ConnectionSetupHandler setupHandler) { + return start(setupHandler, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR); + } + + public StartedServer start(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { + server.start(new ConnectionHandler() { + @Override + public Observable handle(Connection newConnection) { + TcpDuplexConnection c = new TcpDuplexConnection(newConnection); + ReactiveSocket rs = DefaultReactiveSocket.fromServerConnection(c, setupHandler, leaseGovernor, + Throwable::printStackTrace); + return Completable.create(new CompletableOnSubscribe() { + @Override + public void call(CompletableSubscriber s) { + rs.start(new io.reactivesocket.rx.Completable() { + @Override + public void success() { + rs.onShutdown(new io.reactivesocket.rx.Completable() { + @Override + public void success() { + s.onCompleted(); + } + + @Override + public void error(Throwable e) { + s.onError(e); + } + }); + } + + @Override + public void error(Throwable e) { + s.onError(e); + } + }); + } + }).toObservable(); + } + }); + + return new StartedServer(); + } + + public static TcpReactiveSocketServer create() { + return create(TcpServer.newServer()); + } + + public static TcpReactiveSocketServer create(int port) { + return create(TcpServer.newServer(port)); + } + + public static TcpReactiveSocketServer create(TcpServer rxNettyServer) { + return new TcpReactiveSocketServer(configure(rxNettyServer)); + } + + private static TcpServer configure(TcpServer rxNettyServer) { + return rxNettyServer.addChannelHandlerLast("line-codec", ReactiveSocketLengthCodec::new) + .addChannelHandlerLast("frame-codec", ReactiveSocketFrameCodec::new); + } + + public final class StartedServer { + + public SocketAddress getServerAddress() { + return server.getServerAddress(); + } + + public int getServerPort() { + return server.getServerPort(); + } + + public void awaitShutdown() { + server.awaitShutdown(); + } + + public void shutdown() { + server.shutdown(); + } + } +} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java index 856cd559b..273145516 100644 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java @@ -15,153 +15,50 @@ */ package io.reactivesocket.transport.tcp; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.DefaultReactiveSocket; import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.transport.tcp.client.ClientTcpDuplexConnection; -import io.reactivesocket.transport.tcp.server.ReactiveSocketServerHandler; import io.reactivesocket.test.TestUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import rx.Observable; -import rx.RxReactiveStreams; import rx.observers.TestSubscriber; -import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; -public class ClientServerTest { +import static io.reactivesocket.test.TestUtil.*; +import static rx.RxReactiveStreams.*; - static ReactiveSocket client; - static Channel serverChannel; - - static EventLoopGroup bossGroup = new NioEventLoopGroup(1); - static EventLoopGroup workerGroup = new NioEventLoopGroup(4); - - static RequestHandler requestHandler = new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return s -> { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - s.onNext(response); - s.onComplete(); - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response) - .repeat()); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return null; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }; - - @BeforeClass - public static void setup() throws Exception { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - ReactiveSocketServerHandler serverHandler = - ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler); - pipeline.addLast(serverHandler); - } - }); - - serverChannel = b.bind("localhost", 7878).sync().channel(); - - ClientTcpDuplexConnection duplexConnection = RxReactiveStreams.toObservable( - ClientTcpDuplexConnection.create(InetSocketAddress.createUnresolved("localhost", 7878), new NioEventLoopGroup()) - ).toBlocking().single(); - - client = DefaultReactiveSocket - .fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), Throwable::printStackTrace); - - client.startAndWait(); - } +public class ClientServerTest { - @AfterClass - public static void tearDown() { - serverChannel.close(); - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - } + @Rule + public final ClientSetupRule setup = new ClientSetupRule(); - @Test + @Test(timeout = 60000) public void testRequestResponse1() { requestResponseN(1500, 1); } - @Test + @Test(timeout = 60000) public void testRequestResponse10() { requestResponseN(1500, 10); } - @Test + @Test(timeout = 60000) public void testRequestResponse100() { requestResponseN(1500, 100); } - @Test + @Test(timeout = 60000) public void testRequestResponse10_000() { requestResponseN(60_000, 10_000); } - @Test + @Test(timeout = 60000) public void testRequestStream() { TestSubscriber ts = TestSubscriber.create(); - RxReactiveStreams - .toObservable(client.requestStream(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .subscribe(ts); + toObservable(setup.getReactiveSocket().requestStream(utf8EncodedPayload("hello", "metadata"))) + .subscribe(ts); ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); @@ -170,15 +67,13 @@ public void testRequestStream() { ts.assertCompleted(); } - @Test + @Test(timeout = 60000) public void testRequestSubscription() throws InterruptedException { TestSubscriber ts = TestSubscriber.create(); - RxReactiveStreams - .toObservable(client.requestSubscription( - TestUtil.utf8EncodedPayload("hello sub", "metadata sub"))) - .take(10) - .subscribe(ts); + toObservable(setup.getReactiveSocket().requestSubscription(utf8EncodedPayload("hello sub", "metadata sub"))) + .take(10) + .subscribe(ts); ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); ts.assertValueCount(10); @@ -191,15 +86,13 @@ public void requestResponseN(int timeout, int count) { TestSubscriber ts = TestSubscriber.create(); Observable - .range(1, count) - .flatMap(i -> - RxReactiveStreams - .toObservable(client - .requestResponse(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .map(payload -> TestUtil.byteToString(payload.getData())) - ) - .doOnError(Throwable::printStackTrace) - .subscribe(ts); + .range(1, count) + .flatMap(i -> + toObservable(setup.getReactiveSocket().requestResponse(utf8EncodedPayload("hello", "metadata"))) + .map(payload -> byteToString(payload.getData())) + ) + .doOnError(Throwable::printStackTrace) + .subscribe(ts); ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); ts.assertValueCount(count); diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientSetupRule.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientSetupRule.java new file mode 100644 index 000000000..e465d7119 --- /dev/null +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientSetupRule.java @@ -0,0 +1,78 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.reactivesocket.transport.tcp; + +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.RequestHandler; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; +import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import rx.RxReactiveStreams; + +import java.net.SocketAddress; + +public class ClientSetupRule extends ExternalResource { + + private TcpReactiveSocketConnector client; + private TcpReactiveSocketServer server; + private SocketAddress serverAddress; + private ReactiveSocket reactiveSocket; + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + server = TcpReactiveSocketServer.create(0); + serverAddress = server.start(new ConnectionSetupHandler() { + @Override + public RequestHandler apply(ConnectionSetupPayload s, ReactiveSocket rs) { + return new TestRequestHandler(); + } + }).getServerAddress(); + + client = TcpReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), + Throwable::printStackTrace); + reactiveSocket = RxReactiveStreams.toObservable(client.connect(serverAddress)) + .toSingle().toBlocking().value(); + + base.evaluate(); + } + }; + } + + public TcpReactiveSocketConnector getClient() { + return client; + } + + public TcpReactiveSocketServer getServer() { + return server; + } + + public SocketAddress getServerAddress() { + return serverAddress; + } + + public ReactiveSocket getReactiveSocket() { + return reactiveSocket; + } +} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/PayloadImpl.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/PayloadImpl.java new file mode 100644 index 000000000..2cf54ff3a --- /dev/null +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/PayloadImpl.java @@ -0,0 +1,56 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.reactivesocket.transport.tcp; + +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +public class PayloadImpl implements Payload { + + private final ByteBuffer data; + + public PayloadImpl(String data) { + this.data = ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)); + } + + public PayloadImpl(String data, Charset charset) { + this.data = ByteBuffer.wrap(data.getBytes(charset)); + } + + public PayloadImpl(byte[] data) { + this.data = ByteBuffer.wrap(data); + } + + public PayloadImpl(ByteBuffer data) { + this.data = data; + } + + @Override + public ByteBuffer getData() { + return data; + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } +} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java index 8b0ad0aac..e41d017d2 100644 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java @@ -15,14 +15,11 @@ */ package io.reactivesocket.transport.tcp; -import io.netty.channel.nio.NioEventLoopGroup; import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.DefaultReactiveSocket; import io.reactivesocket.Payload; import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.transport.tcp.client.ClientTcpDuplexConnection; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; import org.HdrHistogram.Recorder; -import org.reactivestreams.Publisher; import rx.Observable; import rx.RxReactiveStreams; import rx.Subscriber; @@ -33,19 +30,17 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -public class Ping { - public static void main(String... args) throws Exception { - NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); +public final class Ping { - Publisher publisher = ClientTcpDuplexConnection - .create(InetSocketAddress.createUnresolved("localhost", 7878), eventLoopGroup); + public static void main(String... args) throws Exception { - ClientTcpDuplexConnection duplexConnection = RxReactiveStreams.toObservable(publisher).toBlocking().last(); - ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8"); ReactiveSocket reactiveSocket = - DefaultReactiveSocket.fromClientConnection(duplexConnection, setupPayload, Throwable::printStackTrace); - - reactiveSocket.startAndWait(); + RxReactiveStreams.toObservable(TcpReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), + Throwable::printStackTrace) + .connect(new InetSocketAddress("localhost", 7878))) + .toSingle() + .toBlocking() + .value(); byte[] data = "hello".getBytes(); @@ -80,15 +75,12 @@ public ByteBuffer getMetadata() { .flatMap(i -> { long start = System.nanoTime(); - return RxReactiveStreams - .toObservable( - reactiveSocket - .requestResponse(keyPayload)) - .doOnError(Throwable::printStackTrace) - .doOnNext(s -> { - long diff = System.nanoTime() - start; - histogram.recordValue(diff); - }); + return RxReactiveStreams.toObservable(reactiveSocket.requestResponse(keyPayload)) + .doOnError(Throwable::printStackTrace) + .doOnNext(s -> { + long diff = System.nanoTime() - start; + histogram.recordValue(diff); + }); }, 16) .doOnError(Throwable::printStackTrace) .subscribe(new Subscriber() { diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java index c118674cf..58cd04f39 100644 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java @@ -15,114 +15,42 @@ */ package io.reactivesocket.transport.tcp; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; +import io.netty.util.internal.ThreadLocalRandom; import io.reactivesocket.Payload; import io.reactivesocket.RequestHandler; -import io.reactivesocket.transport.tcp.server.ReactiveSocketServerHandler; -import io.reactivesocket.test.TestUtil; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; import rx.Observable; import rx.RxReactiveStreams; import java.nio.ByteBuffer; -import java.util.Random; -public class Pong { +public final class Pong { + public static void main(String... args) throws Exception { byte[] response = new byte[1024]; - Random r = new Random(); - r.nextBytes(response); - - RequestHandler requestHandler = new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return subscriber -> { - Payload responsePayload = new Payload() { - ByteBuffer data = ByteBuffer.wrap(response); - ByteBuffer metadata = ByteBuffer.allocate(0); - - public ByteBuffer getData() { - return data; - } - - @Override - public ByteBuffer getMetadata() { - return metadata; - } - }; - - subscriber.onNext(responsePayload); - subscriber.onComplete(); - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = - TestUtil.utf8EncodedPayload("hello world", "metadata"); - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = - TestUtil.utf8EncodedPayload("hello world", "metadata"); - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - Observable observable = - RxReactiveStreams - .toObservable(inputs) - .map(input -> input); - return RxReactiveStreams.toPublisher(observable); - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }; - - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - EventLoopGroup workerGroup = new NioEventLoopGroup(); - - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - ReactiveSocketServerHandler serverHandler = - ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler); - pipeline.addLast(serverHandler); - } - }); - - Channel localhost = b.bind("localhost", 7878).sync().channel(); - localhost.closeFuture().sync(); + ThreadLocalRandom.current().nextBytes(response); + + TcpReactiveSocketServer.create(7878) + .start((setupPayload, reactiveSocket) -> { + return new RequestHandler.Builder() + .withRequestResponse(payload -> { + Payload responsePayload = new Payload() { + ByteBuffer data = ByteBuffer.wrap(response); + ByteBuffer metadata = ByteBuffer.allocate(0); + + @Override + public ByteBuffer getData() { + return data; + } + + @Override + public ByteBuffer getMetadata() { + return metadata; + } + }; + return RxReactiveStreams.toPublisher(Observable.just(responsePayload)); + }) + .build(); + }).awaitShutdown(); } } diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java new file mode 100644 index 000000000..33491d644 --- /dev/null +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.reactivesocket.transport.tcp; + +import io.reactivesocket.Payload; +import io.reactivesocket.RequestHandler; +import io.reactivesocket.exceptions.UnsupportedSetupException; +import io.reactivesocket.test.TestUtil; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import rx.Observable; + +import static rx.RxReactiveStreams.*; + +public class TestRequestHandler extends RequestHandler { + + @Override + public Publisher handleRequestResponse(Payload payload) { + return toPublisher(Observable.just(TestUtil.utf8EncodedPayload("hello world", "metadata"))); + } + + @Override + public Publisher handleRequestStream(Payload payload) { + return toPublisher(toObservable(handleRequestResponse(payload)).repeat(10)); + } + + @Override + public Publisher handleSubscription(Payload payload) { + return toPublisher(toObservable(handleRequestStream(payload)).repeat()); + } + + @Override + public Publisher handleFireAndForget(Payload payload) { + return Subscriber::onComplete; + } + + @Override + public Publisher handleChannel(Payload initialPayload, Publisher inputs) { + return toPublisher(Observable.error(new UnsupportedSetupException("Channel not supported."))); + } + + @Override + public Publisher handleMetadataPush(Payload payload) { + return toPublisher(Observable.error(new UnsupportedSetupException("Metadata push not supported."))); + } +} diff --git a/reactivesocket-transport-websocket/build.gradle b/reactivesocket-transport-websocket/build.gradle index a8b7eb4ea..80439f63b 100644 --- a/reactivesocket-transport-websocket/build.gradle +++ b/reactivesocket-transport-websocket/build.gradle @@ -1,6 +1,7 @@ dependencies { compile project(':reactivesocket-core') compile project(':reactivesocket-transport-tcp') + compile 'io.netty:netty-codec-http:4.1.0.Final' testCompile project(':reactivesocket-test') } diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java index 7c2d4bb67..221c2ae2e 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java @@ -16,9 +16,7 @@ package io.reactivesocket.transport.websocket.server; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelId; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.reactivesocket.ConnectionSetupHandler; @@ -27,12 +25,9 @@ import io.reactivesocket.LeaseGovernor; import io.reactivesocket.ReactiveSocket; import io.reactivesocket.transport.tcp.MutableDirectByteBuf; -import io.reactivesocket.transport.tcp.server.ServerTcpDuplexConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ConcurrentHashMap; - public class ReactiveSocketServerHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class); @@ -43,7 +38,6 @@ public class ReactiveSocketServerHandler extends SimpleChannelInboundHandler