Skip to content

Commit

Permalink
feat: update packet splitting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
klikli-dev committed Apr 9, 2023
1 parent a2b4ebf commit 1bd67a9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class OccultismPackets {
PROTOCOL_VERSION::equals,
PROTOCOL_VERSION::equals
);
public static final PacketSplitter SPLITTER = new PacketSplitter(5, INSTANCE, CHANNEL);
public static final PacketSplitter SPLITTER = new PacketSplitter(10, INSTANCE, CHANNEL);

private static int ID = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import net.minecraft.resources.ResourceLocation;
import net.minecraft.server.level.ServerPlayer;
import net.minecraftforge.network.NetworkEvent;
import net.minecraftforge.network.NetworkEvent.Context;
import net.minecraftforge.network.PacketDistributor;
import net.minecraftforge.network.simple.SimpleChannel;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -41,30 +40,31 @@
import java.util.function.Supplier;

public class PacketSplitter {
private final static int MAX_PACKET_SIZE = 32767;
private static final Map<Integer, Map<Integer, byte[]>> packageCache = new HashMap<>();
private final ResourceLocation CHANNEL_ID;
private final SimpleChannel CHANNEL;
private final static int MAX_PACKET_SIZE = 943718;
private static final Map<Integer, Map<Integer, byte[]>> PACKAGE_CACHE = new HashMap<>();

private final ResourceLocation channelId;
private final SimpleChannel channel;
private final Map<Integer, ServerPlayer> messageTargets = new HashMap<>();
private final Map<Integer, Integer> packetMaximums = new HashMap<>();
private final Set<Class<?>> messagesToSplit = new HashSet<>();
private final int maxNumberOfMessages;
private int comId = 0;
private int ID;
private int id;

public PacketSplitter(int maxNumberOfMessages, SimpleChannel CHANNEL, ResourceLocation CHANNEL_ID) {
public PacketSplitter(int maxNumberOfMessages, SimpleChannel channel, ResourceLocation CHANNEL_ID) {
this.maxNumberOfMessages = maxNumberOfMessages;
this.CHANNEL = CHANNEL;
this.CHANNEL_ID = CHANNEL_ID;
this.channel = channel;
this.channelId = CHANNEL_ID;
}

public boolean shouldMessageBeSplit(Class<?> clazz) {
return this.messagesToSplit.contains(clazz);
}

public void sendToPlayer(ServerPlayer player, Object message) {
if (this.ID == 0) this.ID++; // in case we wrapped around, 0 is reserved for server
int id = this.ID++;
if (this.id == 0) this.id++; // in case we wrapped around, 0 is reserved for server
int id = this.id++;
this.messageTargets.put(id, player);
this.sendPacket(message, id, PacketDistributor.PLAYER.with(() -> player));
}
Expand All @@ -81,11 +81,11 @@ private void sendPacket(Object Message, int id, PacketDistributor.PacketTarget t
//write the message id to be able to figure out where the packet is supposed to go in the wrapper
bufIn.writeInt(id);

int index = this.CHANNEL.encodeMessage(Message, bufIn);
target.send(target.getDirection().buildPacket(Pair.of(bufIn, index), this.CHANNEL_ID).getThis());
int index = this.channel.encodeMessage(Message, bufIn);
target.send(target.getDirection().buildPacket(Pair.of(bufIn, index), this.channelId).getThis());
}

public <MSG> void registerMessage(int index, Class<MSG> messageType, BiConsumer<MSG, FriendlyByteBuf> encoder, Function<FriendlyByteBuf, MSG> decoder, BiConsumer<MSG, Supplier<Context>> messageConsumer) {
public <MSG> void registerMessage(int index, Class<MSG> messageType, BiConsumer<MSG, FriendlyByteBuf> encoder, Function<FriendlyByteBuf, MSG> decoder, BiConsumer<MSG, Supplier<NetworkEvent.Context>> messageConsumer) {
this.registerMessage(index, this.maxNumberOfMessages, messageType, encoder, decoder, messageConsumer);
}

Expand All @@ -105,8 +105,7 @@ public <MSG> void registerMessage(int index, int maxNumberOfMessages, Class<MSG>
this.createSplittingConsumer(player).accept(msg, buffer);
};


this.CHANNEL.registerMessage(index, messageType, wrappedEncoder, this.createPacketCombiner().andThen(decoder), messageConsumer);
this.channel.registerMessage(index, messageType, wrappedEncoder, this.createPacketCombiner().andThen(decoder), messageConsumer);
}

private <MSG> BiConsumer<MSG, FriendlyByteBuf> createSplittingConsumer(ServerPlayer playerEntity) {
Expand Down Expand Up @@ -147,20 +146,20 @@ private <MSG> BiConsumer<MSG, FriendlyByteBuf> createSplittingConsumer(ServerPla
packetIndex++;
} else {
//Construct the split packet.
MessageSplitPacket splitPacketMessage = new MessageSplitPacket(comId, packetIndex++, subPacketData);
var splitPacketMessage = new MessageSplitPacket(comId, packetIndex++, subPacketData);

if (playerEntity == null) {
this.CHANNEL.send(PacketDistributor.SERVER.noArg(), splitPacketMessage);
this.channel.send(PacketDistributor.SERVER.noArg(), splitPacketMessage);
} else {
this.CHANNEL.send(PacketDistributor.PLAYER.with(() -> playerEntity), splitPacketMessage);
this.channel.send(PacketDistributor.PLAYER.with(() -> playerEntity), splitPacketMessage);
}
}

//Move our working index.
currentIndex += sliceSize;

if (packetIndex > maximumPackets) {
LogManager.getLogger().error("Failure Splitting Packets on Channel \"" + this.CHANNEL_ID + "\"." + " with " + MSG.getClass() + ". " +
LogManager.getLogger().error("Failure Splitting Packets on Channel \"" + this.channelId + "\"." + " with " + MSG.getClass() + ". " +
" Number of Packets sent " + (packetIndex - 1) + ", expected number of Packets " + expectedPackets + ", maximum number of packets for a message of this type " + this.packetMaximums.get(packetId));
failure = true;
break;
Expand Down Expand Up @@ -192,12 +191,12 @@ private Function<FriendlyByteBuf, FriendlyByteBuf> createPacketCombiner() {

int comId = buf.readInt();

Map<Integer, byte[]> partsMap = packageCache.get(comId);
Map<Integer, byte[]> partsMap = PACKAGE_CACHE.get(comId);
if (partsMap == null || partsMap.size() != size - 1) {
int partSize = partsMap == null ? 0 : partsMap.size();
int id = buf.readUnsignedByte();
int max = this.packetMaximums.get(id) == null ? 0 : this.packetMaximums.get(id);
throw new PacketSplittingException(this.CHANNEL_ID, partSize, size, max, id);
throw new PacketSplittingException(this.channelId, partSize, size, max, id);
}

//Add data that came from this packet
Expand All @@ -213,16 +212,16 @@ private Function<FriendlyByteBuf, FriendlyByteBuf> createPacketCombiner() {
FriendlyByteBuf buffer = new FriendlyByteBuf(Unpooled.wrappedBuffer(packetData));

//remove data from cache
packageCache.remove(comId);
PACKAGE_CACHE.remove(comId);
return buffer;
};
}

public void addPackagePart(int communicationId, int packetIndex, byte[] payload) {
//Sync on the message cache since this is still on the Netty thread.
synchronized (PacketSplitter.packageCache) {
PacketSplitter.packageCache.computeIfAbsent(communicationId, (id) -> new ConcurrentHashMap<>());
PacketSplitter.packageCache.get(communicationId).put(packetIndex, payload);
synchronized (PacketSplitter.PACKAGE_CACHE) {
PacketSplitter.PACKAGE_CACHE.computeIfAbsent(communicationId, (id) -> new ConcurrentHashMap<>());
PacketSplitter.PACKAGE_CACHE.get(communicationId).put(packetIndex, payload);
}
}
}
Expand Down

0 comments on commit 1bd67a9

Please sign in to comment.