Skip to content

Commit

Permalink
Using RxNetty for tcp transport (#101)
Browse files Browse the repository at this point in the history
* Using RxNetty for tcp transport

Current TCP transport implementation lacks a few critical features around insights and network flow control.
Since RxNetty already has these features, it makes sense to use it.

* Merge branch 'master' of https://github.com/ReactiveSocket/reactivesocket-java into rxnetty

# Conflicts:
#	reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java
#	reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java
#	reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java

* Review comments

Also updated to rxnetty-0.5.2-rc.3
  • Loading branch information
NiteshKant authored and stevegury committed Jun 23, 2016
1 parent 59a4ebe commit 023bb54
Show file tree
Hide file tree
Showing 26 changed files with 750 additions and 909 deletions.
1 change: 1 addition & 0 deletions reactivesocket-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ dependencies {
compile project(':reactivesocket-transport-tcp')

compile project(':reactivesocket-test')
runtime group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.21'
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,53 @@
*/
package io.reactivesocket.examples;

import io.netty.channel.nio.NioEventLoopGroup;
import io.reactivesocket.ConnectionSetupHandler;
import io.reactivesocket.ConnectionSetupPayload;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.RequestHandler;
import io.reactivesocket.client.ClientBuilder;
import io.reactivesocket.test.TestUtil;
import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector;
import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer;
import io.reactivesocket.util.Unsafe;
import io.reactivesocket.test.TestUtil;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.RxReactiveStreams;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public final class EchoClient {

private static Publisher<List<SocketAddress>> source(SocketAddress sa) {
return sub -> sub.onNext(Collections.singletonList(sa));
}

public static void main(String... args) throws Exception {
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8888);

ConnectionSetupHandler setupHandler = (setupPayload, reactiveSocket) -> {
return new RequestHandler.Builder()
.withRequestResponse(
payload -> RxReactiveStreams.toPublisher(Observable.just(payload)))
.build();
};

SocketAddress serverAddress = TcpReactiveSocketServer.create()
.start(setupHandler)
.getServerAddress();

ConnectionSetupPayload setupPayload =
ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS);

TcpReactiveSocketConnector tcp =
new TcpReactiveSocketConnector(new NioEventLoopGroup(8), setupPayload, System.err::println);
TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace);

ReactiveSocket client = ClientBuilder.instance()
.withSource(source(address))
.withSource(RxReactiveStreams.toPublisher(Observable.just(Collections.singletonList(serverAddress))))
.withConnector(tcp)
.build();

Unsafe.awaitAvailability(client);

Payload request = TestUtil.utf8EncodedPayload("Hello", "META");
Payload response = Unsafe.blockingSingleWait(client.requestResponse(request), 1, TimeUnit.SECONDS);

System.out.println(response);
RxReactiveStreams.toObservable(client.requestResponse(request))
.map(TestUtil::dataAsString)
.toBlocking()
.forEach(System.out::println);
}
}
20 changes: 20 additions & 0 deletions reactivesocket-examples/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Copyright 2015 Netflix, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%c %d{dd MMM yyyy HH:mm:ss,SSS} %5p [%t] (%F:%L) - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public static Payload utf8EncodedPayload(final String data, final String metadat
return new PayloadImpl(data, metadata);
}

public static String dataAsString(Payload payload) {
ByteBuffer data = payload.getData();
byte[] dst = new byte[data.remaining()];
data.get(dst);
return new String(dst);
}

public static String byteToString(ByteBuffer byteBuffer)
{
byteBuffer = byteBuffer.duplicate();
Expand Down
3 changes: 1 addition & 2 deletions reactivesocket-transport-tcp/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
dependencies {
compile project(':reactivesocket-core')
compile 'io.netty:netty-handler:4.1.0.CR7'
compile 'io.netty:netty-codec-http:4.1.0.CR7'
compile 'io.reactivex:rxnetty-tcp:0.5.2-rc.3'

testCompile project(':reactivesocket-test')
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.reactivesocket.transport.tcp;

import io.reactivesocket.Frame;
import io.reactivesocket.rx.Observer;
import rx.Subscriber;

public class ObserverSubscriber extends Subscriber<Frame> {

private final Observer<Frame> o;

public ObserverSubscriber(Observer<Frame> o) {
this.o = o;
}

@Override
public void onCompleted() {
o.onComplete();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(Frame frame) {
o.onNext(frame);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.reactivesocket.transport.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.reactivesocket.Frame;

import java.nio.ByteBuffer;

/**
* A Codec that aids reading and writing of ReactiveSocket {@link Frame}s.
*/
public class ReactiveSocketFrameCodec extends ChannelDuplexHandler {

private final MutableDirectByteBuf buffer = new MutableDirectByteBuf(Unpooled.buffer(0));
private final Frame frame = Frame.allocate(buffer);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
try {
buffer.wrap((ByteBuf) msg);
frame.wrap(buffer, 0);
ctx.fireChannelRead(frame);
} finally {
ReferenceCountUtil.release(msg);
}
} else {
super.channelRead(ctx, msg);
}
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof Frame) {
ByteBuffer src = ((Frame)msg).getByteBuffer();
ByteBuf toWrite = ctx.alloc().buffer(src.remaining()).writeBytes(src);
ctx.write(toWrite, promise);
} else {
super.write(ctx, msg, promise);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.reactivesocket.transport.tcp;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.agrona.BitUtil;

public class ReactiveSocketLengthCodec extends LengthFieldBasedFrameDecoder {

public ReactiveSocketLengthCodec() {
super(Integer.MAX_VALUE, 0, BitUtil.SIZE_OF_INT, -1 * BitUtil.SIZE_OF_INT, 0);
}
}
Loading

0 comments on commit 023bb54

Please sign in to comment.