Skip to content

Commit

Permalink
Adds support for serialization factories.
Browse files Browse the repository at this point in the history
  • Loading branch information
crykn committed Aug 14, 2017
1 parent f43396b commit 6b4c611
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 250 deletions.
22 changes: 10 additions & 12 deletions src/main/java/com/esotericsoftware/kryonet/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
import com.esotericsoftware.kryonet.FrameworkMessage.DiscoverHost;
import com.esotericsoftware.kryonet.FrameworkMessage.RegisterTCP;
import com.esotericsoftware.kryonet.FrameworkMessage.RegisterUDP;
import com.esotericsoftware.kryonet.serialization.KryoSerialization;
import com.esotericsoftware.kryonet.serialization.KryoSerializationFactory;
import com.esotericsoftware.kryonet.serialization.KryoSerializationFactory.KryoSerialization;
import com.esotericsoftware.kryonet.serialization.Serialization;
import com.esotericsoftware.kryonet.serialization.SerializationFactory;

/**
* Represents a TCP and optionally a UDP connection to a {@link Server}.
Expand Down Expand Up @@ -118,15 +120,15 @@ public Client() {
* largest object that will be sent or received.
*/
public Client(int writeBufferSize, int objectBufferSize) {
this(writeBufferSize, objectBufferSize, new KryoSerialization());
this(writeBufferSize, objectBufferSize, new KryoSerializationFactory());
}

public Client(int writeBufferSize, int objectBufferSize,
Serialization serialization) {
SerializationFactory serializationFactory) {
super();
endPoint = this;

this.serialization = serialization;
this.serialization = serializationFactory.newInstance(this);

this.discoveryHandler = ClientDiscoveryHandler.DEFAULT;

Expand All @@ -144,10 +146,6 @@ public void setDiscoveryHandler(
discoveryHandler = newDiscoveryHandler;
}

public Serialization getSerialization() {
return serialization;
}

public Kryo getKryo() {
return ((KryoSerialization) serialization).getKryo();
}
Expand Down Expand Up @@ -259,7 +257,7 @@ public void connect(int timeout, InetAddress host, int tcpPort, int udpPort)
&& System.currentTimeMillis() < endTime) {
RegisterUDP registerUDP = new RegisterUDP();
registerUDP.connectionID = id;
udp.send(this, registerUDP, udpAddress);
udp.send(registerUDP, udpAddress);
try {
udpRegistrationLock.wait(100);
} catch (InterruptedException ignored) {
Expand Down Expand Up @@ -352,7 +350,7 @@ public void update(int timeout) throws IOException {
if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
if (selectionKey.attachment() == tcp) {
while (true) {
Object object = tcp.readObject(this);
Object object = tcp.readObject();
if (object == null)
break;
if (!tcpRegistered) {
Expand Down Expand Up @@ -416,7 +414,7 @@ public void update(int timeout) throws IOException {
} else {
if (udp.readFromAddress() == null)
continue;
Object object = udp.readObject(this);
Object object = udp.readObject();
if (object == null)
continue;
if (DEBUG) {
Expand Down Expand Up @@ -581,7 +579,7 @@ public Thread getUpdateThread() {
private void broadcast(int udpPort, DatagramSocket socket)
throws IOException {
ByteBuffer dataBuffer = ByteBuffer.allocate(64);
serialization.write(null, dataBuffer, new DiscoverHost());
serialization.write(dataBuffer, new DiscoverHost());
dataBuffer.flip();
byte[] data = new byte[dataBuffer.limit()];
dataBuffer.get(data);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/esotericsoftware/kryonet/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public int sendTCP(Object object) {
if (object == null)
throw new IllegalArgumentException("object cannot be null.");
try {
int length = tcp.send(this, object);
int length = tcp.send(object);
if (length == 0) {
if (TRACE)
trace("kryonet", this + " TCP had nothing to send.");
Expand Down Expand Up @@ -163,7 +163,7 @@ public int sendUDP(Object object) {
if (address == null)
throw new SocketException("Connection is closed.");

int length = udp.send(this, object, address);
int length = udp.send(object, address);
if (length == 0) {
if (TRACE)
trace("kryonet", this + " UDP had nothing to send.");
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/com/esotericsoftware/kryonet/EndPoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,18 @@
import java.io.IOException;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryonet.serialization.KryoSerialization;
import com.esotericsoftware.kryonet.serialization.Serialization;
import com.esotericsoftware.kryonet.serialization.KryoSerializationFactory.KryoSerialization;

/**
* Represents the local end point of a connection.
*
* @author Nathan Sweet <[email protected]>
*/
public interface EndPoint extends Runnable {
/**
* Gets the serialization instance that will be used to serialize and
* deserialize objects.
*/
public Serialization getSerialization();

/**
* If the listener already exists, it is not added again.
* Adds a listener to the endpoint. If the listener already exists, it is
* <i>not</i> added again.
*/
public void addListener(Listener listener);

Expand Down
34 changes: 19 additions & 15 deletions src/main/java/com/esotericsoftware/kryonet/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@
import com.esotericsoftware.kryonet.FrameworkMessage.DiscoverHost;
import com.esotericsoftware.kryonet.FrameworkMessage.RegisterTCP;
import com.esotericsoftware.kryonet.FrameworkMessage.RegisterUDP;
import com.esotericsoftware.kryonet.serialization.KryoSerialization;
import com.esotericsoftware.kryonet.serialization.Serialization;
import com.esotericsoftware.kryonet.serialization.KryoSerializationFactory;
import com.esotericsoftware.kryonet.serialization.SerializationFactory;

/**
* Manages TCP and optionally UDP connections from many {@linkplain Client Clients}.
* Manages TCP and optionally UDP connections from many {@linkplain Client
* Clients}.
*
* @author Nathan Sweet <[email protected]>
*/
public class Server implements EndPoint {
private final Serialization serialization;
private final SerializationFactory serializationFactory;
private final int writeBufferSize, objectBufferSize;
private final Selector selector;
private int emptySelects;
Expand Down Expand Up @@ -134,15 +135,15 @@ public Server() {
* largest object that will be sent or received.
*/
public Server(int writeBufferSize, int objectBufferSize) {
this(writeBufferSize, objectBufferSize, new KryoSerialization());
this(writeBufferSize, objectBufferSize, new KryoSerializationFactory());
}

public Server(int writeBufferSize, int objectBufferSize,
Serialization serialization) {
SerializationFactory serializationFactory) {
this.writeBufferSize = writeBufferSize;
this.objectBufferSize = objectBufferSize;

this.serialization = serialization;
this.serializationFactory = serializationFactory;

this.discoveryHandler = ServerDiscoveryHandler.DEFAULT;

Expand All @@ -158,12 +159,12 @@ public void setDiscoveryHandler(
discoveryHandler = newDiscoveryHandler;
}

public Serialization getSerialization() {
return serialization;
public SerializationFactory getSerializationFactory() {
return serializationFactory;
}

public Kryo getKryo() {
return ((KryoSerialization) serialization).getKryo();
return ((KryoSerializationFactory) serializationFactory).getKryo();
}

/**
Expand Down Expand Up @@ -206,7 +207,9 @@ public void bind(InetSocketAddress tcpPort, InetSocketAddress udpPort)
+ "/TCP");

if (udpPort != null) {
udp = new UdpConnection(serialization, objectBufferSize);
udp = new UdpConnection(
serializationFactory.newInstance(null),
objectBufferSize);
udp.bind(selector, udpPort);
if (DEBUG)
debug("kryonet", "Accepting connections on port: "
Expand Down Expand Up @@ -282,7 +285,7 @@ public void update(int timeout) throws IOException {
try {
while (true) {
Object object = fromConnection.tcp
.readObject(fromConnection);
.readObject();
if (object == null)
break;
if (DEBUG) {
Expand Down Expand Up @@ -391,7 +394,7 @@ public void update(int timeout) throws IOException {

Object object;
try {
object = udp.readObject(fromConnection);
object = udp.readObject();
} catch (KryoNetException ex) {
if (WARN) {
if (fromConnection != null) {
Expand Down Expand Up @@ -442,7 +445,7 @@ public void update(int timeout) throws IOException {
try {
boolean responseSent = discoveryHandler
.onDiscoverHost(udp.datagramChannel,
fromAddress, serialization);
fromAddress);
if (DEBUG && responseSent)
debug("kryonet",
"Responded to host discovery from: "
Expand Down Expand Up @@ -548,7 +551,8 @@ public void stop() {

private void acceptOperation(SocketChannel socketChannel) {
Connection connection = newConnection();
connection.initialize(serialization, writeBufferSize, objectBufferSize);
connection.initialize(serializationFactory.newInstance(connection),
writeBufferSize, objectBufferSize);
connection.endPoint = this;
UdpConnection udp = this.udp;
if (udp != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.channels.DatagramChannel;

import com.esotericsoftware.kryonet.FrameworkMessage.DiscoverHost;
import com.esotericsoftware.kryonet.serialization.Serialization;

public interface ServerDiscoveryHandler {
/**
Expand All @@ -37,8 +36,7 @@ public interface ServerDiscoveryHandler {

@Override
public boolean onDiscoverHost(DatagramChannel datagramChannel,
InetSocketAddress fromAddress, Serialization serialization)
throws IOException {
InetSocketAddress fromAddress) throws IOException {
datagramChannel.send(emptyBuffer, fromAddress);
return true;
}
Expand All @@ -49,15 +47,12 @@ public boolean onDiscoverHost(DatagramChannel datagramChannel,
*
* @param fromAddress
* {@link InetSocketAddress} the {@link DiscoverHost} came from
* @param serialization
* the {@link Server}'s {@link Serialization} instance
* @return true if a response was sent to {@code fromAddress}, false
* otherwise
* @throws IOException
* from the use of
* {@link DatagramChannel#send(ByteBuffer, java.net.SocketAddress)}
*/
public boolean onDiscoverHost(DatagramChannel datagramChannel,
InetSocketAddress fromAddress, Serialization serialization)
throws IOException;
InetSocketAddress fromAddress) throws IOException;
}
26 changes: 12 additions & 14 deletions src/main/java/com/esotericsoftware/kryonet/TcpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package com.esotericsoftware.kryonet;

import static com.esotericsoftware.minlog.Log.DEBUG;
import static com.esotericsoftware.minlog.Log.TRACE;
import static com.esotericsoftware.minlog.Log.debug;
import static com.esotericsoftware.minlog.Log.trace;

import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
Expand All @@ -30,11 +35,6 @@

import com.esotericsoftware.kryonet.serialization.Serialization;

import static com.esotericsoftware.minlog.Log.DEBUG;
import static com.esotericsoftware.minlog.Log.TRACE;
import static com.esotericsoftware.minlog.Log.debug;
import static com.esotericsoftware.minlog.Log.trace;

/**
* @author Nathan Sweet <[email protected]>
*/
Expand Down Expand Up @@ -130,7 +130,7 @@ public void connect(Selector selector, SocketAddress remoteAddress,
}
}

public Object readObject(Connection connection) throws IOException {
public Object readObject() throws IOException {
SocketChannel socketChannel = this.socketChannel;
if (socketChannel == null)
throw new SocketException("Connection is closed.");
Expand Down Expand Up @@ -180,7 +180,7 @@ public Object readObject(Connection connection) throws IOException {
readBuffer.limit(startPosition + length);
Object object;
try {
object = serialization.read(connection, readBuffer);
object = serialization.read(readBuffer);
} catch (Exception ex) {
throw new KryoNetException("Error during deserialization.", ex);
}
Expand Down Expand Up @@ -227,7 +227,7 @@ private boolean writeToSocket() throws IOException {
/**
* This method is thread safe.
*/
public int send(Connection connection, Object object) throws IOException {
public int send(Object object) throws IOException {
SocketChannel socketChannel = this.socketChannel;
if (socketChannel == null)
throw new SocketException("Connection is closed.");
Expand All @@ -239,7 +239,7 @@ public int send(Connection connection, Object object) throws IOException {

// Write data.
try {
serialization.write(connection, writeBuffer, object);
serialization.write(writeBuffer, object);
} catch (KryoNetException ex) {
throw new KryoNetException("Error serializing object of type: "
+ object.getClass().getName(), ex);
Expand Down Expand Up @@ -267,13 +267,11 @@ public int send(Connection connection, Object object) throws IOException {
/ (float) writeBuffer.capacity();
if (DEBUG && percentage > 0.75f)
debug("kryonet",
connection
+ " TCP write buffer is approaching capacity: "
" TCP write buffer is approaching capacity: "
+ percentage + "%");
else if (TRACE && percentage > 0.25f)
trace("kryonet",
connection + " TCP write buffer utilization: "
+ percentage + "%");
trace("kryonet", " TCP write buffer utilization: "
+ percentage + "%");
}

lastWriteTime = System.currentTimeMillis();
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/com/esotericsoftware/kryonet/UdpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package com.esotericsoftware.kryonet;

import static com.esotericsoftware.minlog.Log.DEBUG;
import static com.esotericsoftware.minlog.Log.debug;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -30,9 +33,6 @@

import com.esotericsoftware.kryonet.serialization.Serialization;

import static com.esotericsoftware.minlog.Log.DEBUG;
import static com.esotericsoftware.minlog.Log.debug;

/**
* @author Nathan Sweet <[email protected]>
*/
Expand Down Expand Up @@ -114,11 +114,11 @@ public InetSocketAddress readFromAddress() throws IOException {
return connectedAddress;
}

public Object readObject(Connection connection) {
public Object readObject() {
readBuffer.flip();
try {
try {
Object object = serialization.read(connection, readBuffer);
Object object = serialization.read(readBuffer);
if (readBuffer.hasRemaining())
throw new KryoNetException("Incorrect number of bytes ("
+ readBuffer.remaining()
Expand All @@ -136,15 +136,14 @@ public Object readObject(Connection connection) {
/**
* This method is thread safe.
*/
public int send(Connection connection, Object object, SocketAddress address)
throws IOException {
public int send(Object object, SocketAddress address) throws IOException {
DatagramChannel datagramChannel = this.datagramChannel;
if (datagramChannel == null)
throw new SocketException("Connection is closed.");
synchronized (writeLock) {
try {
try {
serialization.write(connection, writeBuffer, object);
serialization.write(writeBuffer, object);
} catch (Exception ex) {
throw new KryoNetException(
"Error serializing object of type: "
Expand Down
Loading

0 comments on commit 6b4c611

Please sign in to comment.