diff --git a/rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java b/rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java index 8710aa61a..feeb5c481 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java @@ -17,6 +17,7 @@ package io.rsocket.core; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.rsocket.ConnectionSetupPayload; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.SetupFrameFlyweight; @@ -40,7 +41,8 @@ public boolean hasMetadata() { @Override public ByteBuf sliceMetadata() { - return SetupFrameFlyweight.metadata(setupFrame); + final ByteBuf metadata = SetupFrameFlyweight.metadata(setupFrame); + return metadata == null ? Unpooled.EMPTY_BUFFER : metadata; } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 4d7d47f6a..c3cbbdfa6 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -212,12 +212,7 @@ private Mono handleFireAndForget(Payload payload) { return UnicastMonoEmpty.newInstance( () -> { ByteBuf requestFrame = - RequestFireAndForgetFrameFlyweight.encode( - allocator, - streamId, - false, - payload.hasMetadata() ? payload.sliceMetadata().retain() : null, - payload.sliceData().retain()); + RequestFireAndForgetFrameFlyweight.encode(allocator, streamId, payload); payload.release(); sendProcessor.onNext(requestFrame); @@ -245,12 +240,7 @@ private Mono handleRequestResponse(final Payload payload) { @Override public void doOnSubscribe() { final ByteBuf requestFrame = - RequestResponseFrameFlyweight.encode( - allocator, - streamId, - false, - payload.sliceMetadata().retain(), - payload.sliceData().retain()); + RequestResponseFrameFlyweight.encode(allocator, streamId, payload); payload.release(); sendProcessor.onNext(requestFrame); @@ -302,15 +292,9 @@ private Flux handleRequestStream(final Payload payload) { public void accept(long n) { if (firstRequest && !receiver.isDisposed()) { firstRequest = false; - sendProcessor.onNext( - RequestStreamFrameFlyweight.encode( - allocator, - streamId, - false, - n, - payload.sliceMetadata().retain(), - payload.sliceData().retain())); if (!payloadReleasedFlag.getAndSet(true)) { + sendProcessor.onNext( + RequestStreamFrameFlyweight.encode(allocator, streamId, n, payload)); payload.release(); } } else if (contains(streamId) && !receiver.isDisposed()) { @@ -399,8 +383,7 @@ protected void hookOnNext(Payload payload) { receiver.onError(t); return; } - final ByteBuf frame = - PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, payload); + final ByteBuf frame = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload); sendProcessor.onNext(frame); payload.release(); @@ -445,13 +428,7 @@ public void accept(long n) { if (!payloadReleasedFlag.getAndSet(true)) { ByteBuf frame = RequestChannelFrameFlyweight.encode( - allocator, - streamId, - false, - false, - n, - initialPayload.sliceMetadata().retain(), - initialPayload.sliceData().retain()); + allocator, streamId, false, n, initialPayload); sendProcessor.onNext(frame); @@ -604,8 +581,8 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) { { Subscription sender = senders.get(streamId); if (sender != null) { - int n = RequestNFrameFlyweight.requestN(frame); - sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n); + long n = RequestNFrameFlyweight.requestN(frame); + sender.request(n); } break; } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index c41f99ff5..3860b8dae 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -35,6 +35,7 @@ import io.rsocket.internal.UnboundedProcessor; import io.rsocket.lease.ResponderLeaseHandler; import java.util.function.Consumer; +import java.util.function.LongConsumer; import javax.annotation.Nullable; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; @@ -300,12 +301,12 @@ private void handleFrame(ByteBuf frame) { handleRequestN(streamId, frame); break; case REQUEST_STREAM: - int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame); + long streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame); Payload streamPayload = payloadDecoder.apply(frame); handleStream(streamId, requestStream(streamPayload), streamInitialRequestN, null); break; case REQUEST_CHANNEL: - int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame); + long channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame); Payload channelPayload = payloadDecoder.apply(frame); handleChannel(streamId, channelPayload, channelInitialRequestN); break; @@ -436,14 +437,14 @@ protected void hookFinally(SignalType type) { private void handleStream( int streamId, Flux response, - int initialRequestN, + long initialRequestN, @Nullable UnicastProcessor requestChannel) { final BaseSubscriber subscriber = new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription s) { - s.request(initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN); + s.request(initialRequestN); } @Override @@ -522,14 +523,30 @@ protected void hookFinally(SignalType type) { .subscribe(subscriber); } - private void handleChannel(int streamId, Payload payload, int initialRequestN) { + private void handleChannel(int streamId, Payload payload, long initialRequestN) { UnicastProcessor frames = UnicastProcessor.create(); channelProcessors.put(streamId, frames); Flux payloads = frames .doOnRequest( - l -> sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, l))) + new LongConsumer() { + boolean first = true; + + @Override + public void accept(long l) { + long n; + if (first) { + first = false; + n = l - 1L; + } else { + n = l; + } + if (n > 0) { + sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n)); + } + } + }) .doFinally( signalType -> { if (channelProcessors.remove(streamId, frames)) { @@ -585,8 +602,8 @@ private void handleRequestN(int streamId, ByteBuf frame) { Subscription subscription = sendingSubscriptions.get(streamId); if (subscription != null) { - int n = RequestNFrameFlyweight.requestN(frame); - subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n); + long n = RequestNFrameFlyweight.requestN(frame); + subscription.request(n); } } } diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java index d8537ec1a..1a8d242b2 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java @@ -166,8 +166,8 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) { header = frame.copy(frame.readerIndex(), FrameHeaderFlyweight.size()); if (frameType == FrameType.REQUEST_CHANNEL || frameType == FrameType.REQUEST_STREAM) { - int i = RequestChannelFrameFlyweight.initialRequestN(frame); - header.writeInt(i); + long i = RequestChannelFrameFlyweight.initialRequestN(frame); + header.writeInt(i > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) i); } putHeader(streamId, header); } @@ -261,10 +261,16 @@ void reassembleFrame(ByteBuf frame, SynchronousSink sink) { private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf header) { ByteBuf metadata; CompositeByteBuf cm = removeMetadata(streamId); - if (cm != null) { - metadata = cm.addComponents(true, PayloadFrameFlyweight.metadata(frame).retain()); + + ByteBuf decodedMetadata = PayloadFrameFlyweight.metadata(frame); + if (decodedMetadata != null) { + if (cm != null) { + metadata = cm.addComponents(true, decodedMetadata.retain()); + } else { + metadata = PayloadFrameFlyweight.metadata(frame).retain(); + } } else { - metadata = PayloadFrameFlyweight.metadata(frame).retain(); + metadata = cm != null ? cm : null; } ByteBuf data = assembleData(frame, streamId); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java index 41e74b469..ea54aa374 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java @@ -30,40 +30,35 @@ private static int decodeLength(final ByteBuf byteBuf) { return length; } - static ByteBuf encodeOnlyData(ByteBufAllocator allocator, final ByteBuf header, ByteBuf data) { - return allocator.compositeBuffer(2).addComponents(true, header, data); - } - static ByteBuf encode( - ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata, ByteBuf data) { - - int length = metadata.readableBytes(); - encodeLength(header, length); - return allocator.compositeBuffer(3).addComponents(true, header, metadata, data); - } - - static ByteBuf encode(ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) { + ByteBufAllocator allocator, + final ByteBuf header, + ByteBuf metadata, + boolean hasMetadata, + ByteBuf data) { - int length = metadata.readableBytes(); - encodeLength(header, length); - return allocator.compositeBuffer(2).addComponents(true, header, metadata); - } + final boolean addData = data != null && data.isReadable(); + final boolean addMetadata = hasMetadata && metadata.isReadable(); - static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) { if (hasMetadata) { - int length = decodeLength(byteBuf); - return byteBuf.readSlice(length); + int length = metadata.readableBytes(); + encodeLength(header, length); + } + + if (addMetadata && addData) { + return allocator.compositeBuffer(3).addComponents(true, header, metadata, data); + } else if (addMetadata) { + return allocator.compositeBuffer(2).addComponents(true, header, metadata); + } else if (addData) { + return allocator.compositeBuffer(2).addComponents(true, header, data); } else { - return Unpooled.EMPTY_BUFFER; + return header; } } - static ByteBuf metadata(ByteBuf byteBuf, boolean hasMetadata) { - byteBuf.markReaderIndex(); - byteBuf.skipBytes(6); - ByteBuf metadata = metadataWithoutMarking(byteBuf, hasMetadata); - byteBuf.resetReaderIndex(); - return metadata; + static ByteBuf metadataWithoutMarking(ByteBuf byteBuf) { + int length = decodeLength(byteBuf); + return byteBuf.readSlice(length); } static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) { @@ -78,12 +73,4 @@ static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) { return Unpooled.EMPTY_BUFFER; } } - - static ByteBuf data(ByteBuf byteBuf, boolean hasMetadata) { - byteBuf.markReaderIndex(); - byteBuf.skipBytes(6); - ByteBuf data = dataWithoutMarking(byteBuf, hasMetadata); - byteBuf.resetReaderIndex(); - return data; - } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java index d8efad72d..8cb01b08f 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java @@ -14,8 +14,7 @@ public static ByteBuf encode( @Nullable ByteBuf metadata, ByteBuf data) { - final boolean hasData = data != null && data.isReadable(); - final boolean hasMetadata = metadata != null && metadata.isReadable(); + final boolean hasMetadata = metadata != null; int flags = FrameHeaderFlyweight.FLAGS_I; @@ -25,15 +24,8 @@ public static ByteBuf encode( final ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags); header.writeInt(extendedType); - if (hasData && hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else if (hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata); - } else if (hasData) { - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); - } else { - return header; - } + + return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data); } public static int extendedType(ByteBuf byteBuf) { @@ -61,10 +53,13 @@ public static ByteBuf metadata(ByteBuf byteBuf) { FrameHeaderFlyweight.ensureFrameType(FrameType.EXT, byteBuf); boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); + if (!hasMetadata) { + return null; + } byteBuf.markReaderIndex(); // Extended type byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES); - ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); + ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf); byteBuf.resetReaderIndex(); return metadata; } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java index b1aa6ff99..a91d52782 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java @@ -13,17 +13,7 @@ public static ByteBuf encode(final ByteBufAllocator allocator, ByteBuf header, B public static ByteBuf encode( final ByteBufAllocator allocator, ByteBuf header, @Nullable ByteBuf metadata, ByteBuf data) { - final boolean hasData = data != null && data.isReadable(); - final boolean hasMetadata = metadata != null && metadata.isReadable(); - - if (hasData && hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else if (hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata); - } else if (hasData) { - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); - } else { - return header; - } + final boolean hasMetadata = metadata != null; + return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data); } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java index e4e6029b3..b591412a6 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java @@ -29,7 +29,7 @@ public static ByteBuf encode( header.writeLong(lp); - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); + return DataAndMetadataFlyweight.encode(allocator, header, null, false, data); } public static boolean respondFlag(ByteBuf byteBuf) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java index 96737d7b3..039c72886 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java @@ -14,7 +14,7 @@ public static ByteBuf encode( @Nullable final ByteBuf metadata) { final boolean hasMetadata = metadata != null; - final boolean writesMetadata = hasMetadata && metadata.isReadable(); + final boolean addMetadata = hasMetadata && metadata.isReadable(); int flags = 0; @@ -27,7 +27,7 @@ public static ByteBuf encode( .writeInt(ttl) .writeInt(numRequests); - if (writesMetadata) { + if (addMetadata) { return allocator.compositeBuffer(2).addComponents(true, header, metadata); } else { return header; diff --git a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java index 4f67d9c72..1b5204036 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java @@ -21,47 +21,24 @@ public static ByteBuf encode( allocator, streamId, fragmentFollows, complete, next, 0, metadata, data); } - public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - boolean complete, - boolean next, - Payload payload) { - return FLYWEIGHT.encode( - allocator, - streamId, - fragmentFollows, - complete, - next, - 0, - payload.hasMetadata() ? payload.metadata().retain() : null, - payload.data().retain()); + public static ByteBuf encodeNext(ByteBufAllocator allocator, int streamId, Payload payload) { + return encode(allocator, streamId, false, payload); } public static ByteBuf encodeNextComplete( ByteBufAllocator allocator, int streamId, Payload payload) { - return FLYWEIGHT.encode( - allocator, - streamId, - false, - true, - true, - 0, - payload.hasMetadata() ? payload.metadata().retain() : null, - payload.data().retain()); + + return encode(allocator, streamId, true, payload); } - public static ByteBuf encodeNext(ByteBufAllocator allocator, int streamId, Payload payload) { - return FLYWEIGHT.encode( - allocator, - streamId, - false, - false, - true, - 0, - payload.hasMetadata() ? payload.metadata().retain() : null, - payload.data().retain()); + static ByteBuf encode( + ByteBufAllocator allocator, int streamId, boolean complete, Payload payload) { + + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); + + return FLYWEIGHT.encode(allocator, streamId, false, complete, true, 0, metadata, data); } public static ByteBuf encodeComplete(ByteBufAllocator allocator, int streamId) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java index 06ddcda03..2a644b07a 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java @@ -13,22 +13,15 @@ private RequestChannelFrameFlyweight() {} public static ByteBuf encode( ByteBufAllocator allocator, int streamId, - boolean fragmentFollows, boolean complete, - long requestN, + long initialRequestN, Payload payload) { - int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); - return FLYWEIGHT.encode( - allocator, - streamId, - fragmentFollows, - complete, - false, - reqN, - payload.metadata(), - payload.data()); + return encode(allocator, streamId, false, complete, initialRequestN, metadata, data); } public static ByteBuf encode( @@ -36,11 +29,15 @@ public static ByteBuf encode( int streamId, boolean fragmentFollows, boolean complete, - long requestN, + long initialRequestN, ByteBuf metadata, ByteBuf data) { - int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; + if (initialRequestN < 1) { + throw new IllegalArgumentException("request n is less than 1"); + } + + int reqN = initialRequestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) initialRequestN; return FLYWEIGHT.encode( allocator, streamId, fragmentFollows, complete, false, reqN, metadata, data); @@ -54,7 +51,8 @@ public static ByteBuf metadata(ByteBuf byteBuf) { return FLYWEIGHT.metadataWithRequestN(byteBuf); } - public static int initialRequestN(ByteBuf byteBuf) { - return FLYWEIGHT.initialRequestN(byteBuf); + public static long initialRequestN(ByteBuf byteBuf) { + int requestN = FLYWEIGHT.initialRequestN(byteBuf); + return requestN == Integer.MAX_VALUE ? Long.MAX_VALUE : requestN; } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java index 5f2d606e4..13b7d907d 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java @@ -10,11 +10,13 @@ public class RequestFireAndForgetFrameFlyweight { private RequestFireAndForgetFrameFlyweight() {} - public static ByteBuf encode( - ByteBufAllocator allocator, int streamId, boolean fragmentFollows, Payload payload) { + public static ByteBuf encode(ByteBufAllocator allocator, int streamId, Payload payload) { + + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); - return FLYWEIGHT.encode( - allocator, streamId, fragmentFollows, payload.metadata(), payload.data()); + return FLYWEIGHT.encode(allocator, streamId, false, metadata, data); } public static ByteBuf encode( diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java index f0a99317a..15fac9f55 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java @@ -30,8 +30,7 @@ ByteBuf encode( @Nullable ByteBuf metadata, ByteBuf data) { - final boolean hasData = data != null && data.isReadable(); - final boolean hasMetadata = metadata != null && metadata.isReadable(); + final boolean hasMetadata = metadata != null; int flags = 0; @@ -57,15 +56,7 @@ ByteBuf encode( header.writeInt(requestN); } - if (hasData && hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else if (hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata); - } else if (hasData) { - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); - } else { - return header; - } + return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data); } ByteBuf data(ByteBuf byteBuf) { @@ -79,9 +70,12 @@ ByteBuf data(ByteBuf byteBuf) { ByteBuf metadata(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); + if (!hasMetadata) { + return null; + } byteBuf.markReaderIndex(); byteBuf.skipBytes(FrameHeaderFlyweight.size()); - ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); + ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf); byteBuf.resetReaderIndex(); return metadata; } @@ -97,9 +91,12 @@ ByteBuf dataWithRequestN(ByteBuf byteBuf) { ByteBuf metadataWithRequestN(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); + if (!hasMetadata) { + return null; + } byteBuf.markReaderIndex(); byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES); - ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); + ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf); byteBuf.resetReaderIndex(); return metadata; } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameFlyweight.java index 5a4c4c273..fe2c752cf 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameFlyweight.java @@ -8,26 +8,23 @@ private RequestNFrameFlyweight() {} public static ByteBuf encode( final ByteBufAllocator allocator, final int streamId, long requestN) { - int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; - return encode(allocator, streamId, reqN); - } - - public static ByteBuf encode(final ByteBufAllocator allocator, final int streamId, int requestN) { - ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.REQUEST_N, 0); if (requestN < 1) { throw new IllegalArgumentException("request n is less than 1"); } - return header.writeInt(requestN); + int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; + + ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.REQUEST_N, 0); + return header.writeInt(reqN); } - public static int requestN(ByteBuf byteBuf) { + public static long requestN(ByteBuf byteBuf) { FrameHeaderFlyweight.ensureFrameType(FrameType.REQUEST_N, byteBuf); byteBuf.markReaderIndex(); byteBuf.skipBytes(FrameHeaderFlyweight.size()); int i = byteBuf.readInt(); byteBuf.resetReaderIndex(); - return i; + return i == Integer.MAX_VALUE ? Long.MAX_VALUE : i; } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java index 2e06c9b82..d328c7fa2 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java @@ -10,9 +10,13 @@ public class RequestResponseFrameFlyweight { private RequestResponseFrameFlyweight() {} - public static ByteBuf encode( - ByteBufAllocator allocator, int streamId, boolean fragmentFollows, Payload payload) { - return encode(allocator, streamId, fragmentFollows, payload.metadata(), payload.data()); + public static ByteBuf encode(ByteBufAllocator allocator, int streamId, Payload payload) { + + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); + + return encode(allocator, streamId, false, metadata, data); } public static ByteBuf encode( diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java index 171c41990..1c06d80c7 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java @@ -11,45 +11,31 @@ public class RequestStreamFrameFlyweight { private RequestStreamFrameFlyweight() {} public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - long requestN, - Payload payload) { - return encode( - allocator, streamId, fragmentFollows, requestN, payload.metadata(), payload.data()); - } + ByteBufAllocator allocator, int streamId, long initialRequestN, Payload payload) { - public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - int requestN, - Payload payload) { - return encode( - allocator, streamId, fragmentFollows, requestN, payload.metadata(), payload.data()); - } + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); - public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - long requestN, - ByteBuf metadata, - ByteBuf data) { - int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; - return encode(allocator, streamId, fragmentFollows, reqN, metadata, data); + return encode(allocator, streamId, false, initialRequestN, metadata, data); } public static ByteBuf encode( ByteBufAllocator allocator, int streamId, boolean fragmentFollows, - int requestN, + long initialRequestN, ByteBuf metadata, ByteBuf data) { + + if (initialRequestN < 1) { + throw new IllegalArgumentException("request n is less than 1"); + } + + int reqN = initialRequestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) initialRequestN; + return FLYWEIGHT.encode( - allocator, streamId, fragmentFollows, false, false, requestN, metadata, data); + allocator, streamId, fragmentFollows, false, false, reqN, metadata, data); } public static ByteBuf data(ByteBuf byteBuf) { @@ -60,7 +46,8 @@ public static ByteBuf metadata(ByteBuf byteBuf) { return FLYWEIGHT.metadataWithRequestN(byteBuf); } - public static int initialRequestN(ByteBuf byteBuf) { - return FLYWEIGHT.initialRequestN(byteBuf); + public static long initialRequestN(ByteBuf byteBuf) { + int requestN = FLYWEIGHT.initialRequestN(byteBuf); + return requestN == Integer.MAX_VALUE ? Long.MAX_VALUE : requestN; } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java index cfd0ca7bc..bfb73fe22 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java @@ -56,7 +56,6 @@ public static ByteBuf encode( final Payload setupPayload) { final ByteBuf data = setupPayload.sliceData(); - final boolean hasData = data.isReadable(); final boolean hasMetadata = setupPayload.hasMetadata(); final ByteBuf metadata = hasMetadata ? setupPayload.sliceMetadata() : null; @@ -94,15 +93,7 @@ public static ByteBuf encode( header.writeByte(length); ByteBufUtil.writeUtf8(header, dataMimeType); - if (hasData && hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else if (hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata); - } else if (hasData) { - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); - } else { - return header; - } + return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data); } public static int version(ByteBuf byteBuf) { @@ -197,9 +188,12 @@ public static String dataMimeType(ByteBuf byteBuf) { public static ByteBuf metadata(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); + if (!hasMetadata) { + return null; + } byteBuf.markReaderIndex(); skipToPayload(byteBuf); - ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); + ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf); byteBuf.resetReaderIndex(); return metadata; } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java b/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java index 692dcb363..0a77e3820 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java @@ -3,8 +3,15 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.rsocket.Payload; -import io.rsocket.frame.*; -import io.rsocket.util.ByteBufPayload; +import io.rsocket.frame.FrameHeaderFlyweight; +import io.rsocket.frame.FrameType; +import io.rsocket.frame.MetadataPushFrameFlyweight; +import io.rsocket.frame.PayloadFrameFlyweight; +import io.rsocket.frame.RequestChannelFrameFlyweight; +import io.rsocket.frame.RequestFireAndForgetFrameFlyweight; +import io.rsocket.frame.RequestResponseFrameFlyweight; +import io.rsocket.frame.RequestStreamFrameFlyweight; +import io.rsocket.util.DefaultPayload; import java.nio.ByteBuffer; /** Default Frame decoder that copies the frames contents for easy of use. */ @@ -45,14 +52,18 @@ public Payload apply(ByteBuf byteBuf) { throw new IllegalArgumentException("unsupported frame type: " + type); } - ByteBuffer metadata = ByteBuffer.allocateDirect(m.readableBytes()); ByteBuffer data = ByteBuffer.allocateDirect(d.readableBytes()); - data.put(d.nioBuffer()); data.flip(); - metadata.put(m.nioBuffer()); - metadata.flip(); - return ByteBufPayload.create(data, metadata); + if (m != null) { + ByteBuffer metadata = ByteBuffer.allocateDirect(m.readableBytes()); + metadata.put(m.nioBuffer()); + metadata.flip(); + + return DefaultPayload.create(data, metadata); + } + + return DefaultPayload.create(data); } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java b/rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java index 0b63590e8..c92f82428 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java @@ -3,7 +3,14 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.rsocket.Payload; -import io.rsocket.frame.*; +import io.rsocket.frame.FrameHeaderFlyweight; +import io.rsocket.frame.FrameType; +import io.rsocket.frame.MetadataPushFrameFlyweight; +import io.rsocket.frame.PayloadFrameFlyweight; +import io.rsocket.frame.RequestChannelFrameFlyweight; +import io.rsocket.frame.RequestFireAndForgetFrameFlyweight; +import io.rsocket.frame.RequestResponseFrameFlyweight; +import io.rsocket.frame.RequestStreamFrameFlyweight; import io.rsocket.util.ByteBufPayload; /** @@ -46,6 +53,6 @@ public Payload apply(ByteBuf byteBuf) { throw new IllegalArgumentException("unsupported frame type: " + type); } - return ByteBufPayload.create(d.retain(), m.retain()); + return ByteBufPayload.create(d.retain(), m != null ? m.retain() : null); } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java index 90bb6e998..ea3142d25 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java @@ -69,7 +69,7 @@ void testSetupPayloadWithEmptyMetadata() { ConnectionSetupPayload setupPayload = new DefaultConnectionSetupPayload(frame); assertFalse(setupPayload.willClientHonorLease()); - assertFalse(setupPayload.hasMetadata()); + assertTrue(setupPayload.hasMetadata()); assertNotNull(setupPayload.metadata()); assertEquals(0, setupPayload.metadata().readableBytes()); assertEquals(payload.data(), setupPayload.data()); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 36abb14b4..da2755cc3 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -20,6 +20,7 @@ import static io.rsocket.frame.FrameHeaderFlyweight.frameType; import static io.rsocket.frame.FrameType.CANCEL; import static io.rsocket.frame.FrameType.REQUEST_CHANNEL; +import static io.rsocket.frame.FrameType.REQUEST_FNF; import static io.rsocket.frame.FrameType.REQUEST_RESPONSE; import static io.rsocket.frame.FrameType.REQUEST_STREAM; import static org.hamcrest.MatcherAssert.assertThat; @@ -35,6 +36,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCounted; import io.rsocket.Payload; @@ -74,7 +76,9 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.runners.model.Statement; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -625,6 +629,103 @@ public void simpleOnDiscardRequestChannelTest2() { rule.assertHasNoLeaks(); } + @ParameterizedTest + @MethodSource("encodeDecodePayloadCases") + public void verifiesThatFrameWithNoMetadataHasDecodedCorrectlyIntoPayload( + FrameType frameType, int framesCnt, int responsesCnt) { + ByteBufAllocator allocator = rule.alloc(); + AssertSubscriber assertSubscriber = AssertSubscriber.create(responsesCnt); + TestPublisher testPublisher = TestPublisher.create(); + + Publisher response; + + switch (frameType) { + case REQUEST_FNF: + response = + testPublisher.mono().flatMap(p -> rule.socket.fireAndForget(p).then(Mono.empty())); + break; + case REQUEST_RESPONSE: + response = testPublisher.mono().flatMap(p -> rule.socket.requestResponse(p)); + break; + case REQUEST_STREAM: + response = testPublisher.mono().flatMapMany(p -> rule.socket.requestStream(p)); + break; + case REQUEST_CHANNEL: + response = rule.socket.requestChannel(testPublisher.flux()); + break; + default: + throw new UnsupportedOperationException("illegal case"); + } + + response.subscribe(assertSubscriber); + testPublisher.next(ByteBufPayload.create("d")); + + int streamId = rule.getStreamIdForRequestType(frameType); + + if (responsesCnt > 0) { + for (int i = 0; i < responsesCnt - 1; i++) { + rule.connection.addToReceivedBuffer( + PayloadFrameFlyweight.encode( + allocator, + streamId, + false, + false, + true, + null, + Unpooled.wrappedBuffer(("rd" + (i + 1)).getBytes()))); + } + + rule.connection.addToReceivedBuffer( + PayloadFrameFlyweight.encode( + allocator, + streamId, + false, + true, + true, + null, + Unpooled.wrappedBuffer(("rd" + responsesCnt).getBytes()))); + } + + if (framesCnt > 1) { + rule.connection.addToReceivedBuffer( + RequestNFrameFlyweight.encode(allocator, streamId, framesCnt)); + } + + for (int i = 1; i < framesCnt; i++) { + testPublisher.next(ByteBufPayload.create("d" + i)); + } + + Assertions.assertThat(rule.connection.getSent()) + .describedAs( + "Interaction Type :[%s]. Expected to observe %s frames sent", frameType, framesCnt) + .hasSize(framesCnt) + .allMatch(bb -> !FrameHeaderFlyweight.hasMetadata(bb)) + .allMatch(ByteBuf::release); + + Assertions.assertThat(assertSubscriber.isTerminated()) + .describedAs("Interaction Type :[%s]. Expected to be terminated", frameType) + .isTrue(); + + Assertions.assertThat(assertSubscriber.values()) + .describedAs( + "Interaction Type :[%s]. Expected to observe %s frames received", + frameType, responsesCnt) + .hasSize(responsesCnt) + .allMatch(p -> !p.hasMetadata()) + .allMatch(p -> p.release()); + + rule.assertHasNoLeaks(); + rule.connection.clearSendReceiveBuffers(); + } + + static Stream encodeDecodePayloadCases() { + return Stream.of( + Arguments.of(REQUEST_FNF, 1, 0), + Arguments.of(REQUEST_RESPONSE, 1, 1), + Arguments.of(REQUEST_STREAM, 1, 5), + Arguments.of(REQUEST_CHANNEL, 5, 5)); + } + public int sendRequestResponse(Publisher response) { Subscriber sub = TestSubscriber.create(); response.subscribe(sub); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java index 05f9fd46e..726be0770 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java @@ -19,6 +19,8 @@ import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE; import static io.rsocket.frame.FrameHeaderFlyweight.frameType; import static io.rsocket.frame.FrameType.REQUEST_CHANNEL; +import static io.rsocket.frame.FrameType.REQUEST_FNF; +import static io.rsocket.frame.FrameType.REQUEST_N; import static io.rsocket.frame.FrameType.REQUEST_RESPONSE; import static io.rsocket.frame.FrameType.REQUEST_STREAM; import static org.hamcrest.MatcherAssert.assertThat; @@ -45,6 +47,7 @@ import io.rsocket.frame.KeepAliveFrameFlyweight; import io.rsocket.frame.PayloadFrameFlyweight; import io.rsocket.frame.RequestChannelFrameFlyweight; +import io.rsocket.frame.RequestFireAndForgetFrameFlyweight; import io.rsocket.frame.RequestNFrameFlyweight; import io.rsocket.frame.RequestResponseFrameFlyweight; import io.rsocket.frame.RequestStreamFrameFlyweight; @@ -55,16 +58,21 @@ import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.ByteBufPayload; import io.rsocket.util.DefaultPayload; +import io.rsocket.util.EmptyPayload; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.runners.model.Statement; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -78,6 +86,7 @@ import reactor.core.publisher.Operators; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.test.publisher.TestPublisher; import reactor.test.util.RaceTestUtils; public class RSocketResponderTest { @@ -605,6 +614,108 @@ public Flux requestChannel(Publisher payloads) { rule.assertHasNoLeaks(); } + @ParameterizedTest + @MethodSource("encodeDecodePayloadCases") + public void verifiesThatFrameWithNoMetadataHasDecodedCorrectlyIntoPayload( + FrameType frameType, int framesCnt, int responsesCnt) { + ByteBufAllocator allocator = rule.alloc(); + AssertSubscriber assertSubscriber = AssertSubscriber.create(framesCnt); + TestPublisher testPublisher = TestPublisher.create(); + + rule.setAcceptingSocket( + new AbstractRSocket() { + @Override + public Mono fireAndForget(Payload payload) { + Mono.just(payload).subscribe(assertSubscriber); + return Mono.empty(); + } + + @Override + public Mono requestResponse(Payload payload) { + Mono.just(payload).subscribe(assertSubscriber); + return testPublisher.mono(); + } + + @Override + public Flux requestStream(Payload payload) { + Mono.just(payload).subscribe(assertSubscriber); + return testPublisher.flux(); + } + + @Override + public Flux requestChannel(Publisher payloads) { + payloads.subscribe(assertSubscriber); + return testPublisher.flux(); + } + }, + 1); + + rule.sendRequest(1, frameType, ByteBufPayload.create("d")); + + // if responses number is bigger than 1 we have to send one extra requestN + if (responsesCnt > 1) { + rule.connection.addToReceivedBuffer( + RequestNFrameFlyweight.encode(allocator, 1, responsesCnt - 1)); + } + + // respond with specific number of elements + for (int i = 0; i < responsesCnt; i++) { + testPublisher.next(ByteBufPayload.create("rd" + i)); + } + + // Listen to incoming frames. Valid for RequestChannel case only + if (framesCnt > 1) { + for (int i = 1; i < responsesCnt; i++) { + rule.connection.addToReceivedBuffer( + PayloadFrameFlyweight.encode( + allocator, + 1, + false, + false, + true, + null, + Unpooled.wrappedBuffer(("d" + (i + 1)).getBytes()))); + } + } + + if (responsesCnt > 0) { + Assertions.assertThat( + rule.connection.getSent().stream().filter(bb -> frameType(bb) != REQUEST_N)) + .describedAs( + "Interaction Type :[%s]. Expected to observe %s frames sent", frameType, responsesCnt) + .hasSize(responsesCnt) + .allMatch(bb -> !FrameHeaderFlyweight.hasMetadata(bb)); + } + + if (framesCnt > 1) { + Assertions.assertThat( + rule.connection.getSent().stream().filter(bb -> frameType(bb) == REQUEST_N)) + .describedAs( + "Interaction Type :[%s]. Expected to observe single RequestN(%s) frame", + frameType, framesCnt - 1) + .hasSize(1) + .first() + .matches(bb -> RequestNFrameFlyweight.requestN(bb) == (framesCnt - 1)); + } + + Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release); + + Assertions.assertThat(assertSubscriber.awaitAndAssertNextValueCount(framesCnt).values()) + .hasSize(framesCnt) + .allMatch(p -> !p.hasMetadata()) + .allMatch(ReferenceCounted::release); + + rule.assertHasNoLeaks(); + } + + static Stream encodeDecodePayloadCases() { + return Stream.of( + Arguments.of(REQUEST_FNF, 1, 0), + Arguments.of(REQUEST_RESPONSE, 1, 1), + Arguments.of(REQUEST_STREAM, 1, 5), + Arguments.of(REQUEST_CHANNEL, 5, 5)); + } + public static class ServerSocketRule extends AbstractSocketRule { private RSocket acceptingSocket; @@ -653,34 +764,25 @@ protected RSocketResponder newRSocket(LeaksTrackingByteBufAllocator allocator) { } private void sendRequest(int streamId, FrameType frameType) { + sendRequest(streamId, frameType, EmptyPayload.INSTANCE); + } + + private void sendRequest(int streamId, FrameType frameType, Payload payload) { ByteBuf request; switch (frameType) { case REQUEST_CHANNEL: request = - RequestChannelFrameFlyweight.encode( - allocator, - streamId, - false, - false, - prefetch, - Unpooled.EMPTY_BUFFER, - Unpooled.EMPTY_BUFFER); + RequestChannelFrameFlyweight.encode(allocator, streamId, false, prefetch, payload); break; case REQUEST_STREAM: - request = - RequestStreamFrameFlyweight.encode( - allocator, - streamId, - false, - prefetch, - Unpooled.EMPTY_BUFFER, - Unpooled.EMPTY_BUFFER); + request = RequestStreamFrameFlyweight.encode(allocator, streamId, prefetch, payload); break; case REQUEST_RESPONSE: - request = - RequestResponseFrameFlyweight.encode( - allocator, streamId, false, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER); + request = RequestResponseFrameFlyweight.encode(allocator, streamId, payload); + break; + case REQUEST_FNF: + request = RequestFireAndForgetFrameFlyweight.encode(allocator, streamId, payload); break; default: throw new IllegalArgumentException("unsupported type: " + frameType); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java index f5a013357..c6b1735e6 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java @@ -20,7 +20,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.rsocket.frame.*; -import io.rsocket.util.DefaultPayload; import java.util.concurrent.ThreadLocalRandom; import org.junit.Assert; import org.junit.jupiter.api.DisplayName; @@ -43,14 +42,17 @@ final class FrameFragmenterTest { @Test void testGettingData() { ByteBuf rr = - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); ByteBuf fnf = - RequestFireAndForgetFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestFireAndForgetFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); ByteBuf rs = - RequestStreamFrameFlyweight.encode(allocator, 1, true, 1, DefaultPayload.create(data)); + RequestStreamFrameFlyweight.encode( + allocator, 1, true, 1, null, Unpooled.wrappedBuffer(data)); ByteBuf rc = RequestChannelFrameFlyweight.encode( - allocator, 1, true, false, 1, DefaultPayload.create(data)); + allocator, 1, true, false, 1, null, Unpooled.wrappedBuffer(data)); ByteBuf data = FrameFragmenter.getData(rr, FrameType.REQUEST_RESPONSE); Assert.assertEquals(data, Unpooled.wrappedBuffer(data)); @@ -73,16 +75,22 @@ void testGettingData() { void testGettingMetadata() { ByteBuf rr = RequestResponseFrameFlyweight.encode( - allocator, 1, true, DefaultPayload.create(data, metadata)); + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); ByteBuf fnf = RequestFireAndForgetFrameFlyweight.encode( - allocator, 1, true, DefaultPayload.create(data, metadata)); + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); ByteBuf rs = RequestStreamFrameFlyweight.encode( - allocator, 1, true, 1, DefaultPayload.create(data, metadata)); + allocator, 1, true, 1, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); ByteBuf rc = RequestChannelFrameFlyweight.encode( - allocator, 1, true, false, 1, DefaultPayload.create(data, metadata)); + allocator, + 1, + true, + false, + 1, + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data)); ByteBuf data = FrameFragmenter.getMetadata(rr, FrameType.REQUEST_RESPONSE); Assert.assertEquals(data, Unpooled.wrappedBuffer(metadata)); @@ -104,7 +112,8 @@ void testGettingMetadata() { @Test void returnEmptBufferWhenNoMetadataPresent() { ByteBuf rr = - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); ByteBuf data = FrameFragmenter.getMetadata(rr, FrameType.REQUEST_RESPONSE); Assert.assertEquals(data, Unpooled.EMPTY_BUFFER); @@ -115,7 +124,8 @@ void returnEmptBufferWhenNoMetadataPresent() { @Test void encodeFirstFrameWithData() { ByteBuf rr = - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -144,7 +154,7 @@ void encodeFirstFrameWithData() { void encodeFirstWithDataChannel() { ByteBuf rc = RequestChannelFrameFlyweight.encode( - allocator, 1, true, false, 10, DefaultPayload.create(data)); + allocator, 1, true, false, 10, null, Unpooled.wrappedBuffer(data)); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -173,7 +183,8 @@ void encodeFirstWithDataChannel() { @Test void encodeFirstWithDataStream() { ByteBuf rc = - RequestStreamFrameFlyweight.encode(allocator, 1, true, 50, DefaultPayload.create(data)); + RequestStreamFrameFlyweight.encode( + allocator, 1, true, 50, null, Unpooled.wrappedBuffer(data)); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -203,10 +214,7 @@ void encodeFirstWithDataStream() { void encodeFirstFrameWithMetadata() { ByteBuf rr = RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))); + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -234,7 +242,7 @@ void encodeFirstFrameWithMetadata() { void encodeFirstWithDataAndMetadataStream() { ByteBuf rc = RequestStreamFrameFlyweight.encode( - allocator, 1, true, 50, DefaultPayload.create(data, metadata)); + allocator, 1, true, 50, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -266,7 +274,8 @@ void encodeFirstWithDataAndMetadataStream() { @Test void fragmentData() { ByteBuf rr = - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); Publisher fragments = FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_RESPONSE, false); @@ -293,11 +302,7 @@ void fragmentData() { void fragmentMetadata() { ByteBuf rr = RequestStreamFrameFlyweight.encode( - allocator, - 1, - true, - 10, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))); + allocator, 1, true, 10, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER); Publisher fragments = FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_STREAM, false); @@ -324,7 +329,7 @@ void fragmentMetadata() { void fragmentDataAndMetadata() { ByteBuf rr = RequestResponseFrameFlyweight.encode( - allocator, 1, true, DefaultPayload.create(data, metadata)); + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); Publisher fragments = FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_RESPONSE, false); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java index 6e0d0dc1b..13632165b 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java @@ -22,7 +22,6 @@ import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; import io.rsocket.frame.*; -import io.rsocket.util.DefaultPayload; import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -48,15 +47,16 @@ final class FrameReassemblerTest { void reassembleData() { List byteBufs = Arrays.asList( - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)), + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, false, false, true, DefaultPayload.create(data))); + allocator, 1, false, false, true, null, Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -88,7 +88,8 @@ void reassembleData() { void passthrough() { List byteBufs = Arrays.asList( - RequestResponseFrameFlyweight.encode(allocator, 1, false, DefaultPayload.create(data))); + RequestResponseFrameFlyweight.encode( + allocator, 1, false, null, Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -115,38 +116,39 @@ void reassembleMetadata() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, false, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER)); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -184,35 +186,40 @@ void reassembleMetadataChannel() { true, false, 100, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, false, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER)); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -249,39 +256,39 @@ void reassembleMetadataStream() { List byteBufs = Arrays.asList( RequestStreamFrameFlyweight.encode( - allocator, - 1, - true, - 250, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, 250, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, false, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER)); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -319,34 +326,33 @@ void reassembleMetadataAndData() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create( - Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, false, false, true, DefaultPayload.create(data))); + allocator, 1, false, false, true, null, Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -387,32 +393,31 @@ public void cancelBeforeAssembling() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create( - Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); Flux.fromIterable(byteBufs).handle(reassembler::reassembleFrame).blockLast(); @@ -436,32 +441,31 @@ public void dispose() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create( - Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); Flux.fromIterable(byteBufs).handle(reassembler::reassembleFrame).blockLast(); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java index b21a7c9da..c5abce339 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java @@ -30,7 +30,6 @@ import io.rsocket.frame.FrameType; import io.rsocket.frame.PayloadFrameFlyweight; import io.rsocket.frame.RequestResponseFrameFlyweight; -import io.rsocket.util.DefaultPayload; import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -59,15 +58,16 @@ final class ReassembleDuplexConnectionTest { void reassembleData() { List byteBufs = Arrays.asList( - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)), + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, false, false, true, DefaultPayload.create(data))); + allocator, 1, false, false, true, null, Unpooled.wrappedBuffer(data))); CompositeByteBuf data = allocator @@ -99,38 +99,39 @@ void reassembleMetadata() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, false, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER)); CompositeByteBuf metadata = allocator @@ -164,34 +165,33 @@ void reassembleMetadataAndData() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create( - Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, false, false, true, DefaultPayload.create(data))); + allocator, 1, false, false, true, null, Unpooled.wrappedBuffer(data))); CompositeByteBuf data = allocator @@ -230,7 +230,7 @@ void reassembleMetadataAndData() { void reassembleNonFragment() { ByteBuf encode = RequestResponseFrameFlyweight.encode( - allocator, 1, false, DefaultPayload.create(Unpooled.wrappedBuffer(data))); + allocator, 1, false, null, Unpooled.wrappedBuffer(data)); when(delegate.receive()).thenReturn(Flux.just(encode)); when(delegate.onClose()).thenReturn(Mono.never()); diff --git a/rsocket-core/src/test/java/io/rsocket/frame/DataAndMetadataFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/DataAndMetadataFlyweightTest.java deleted file mode 100644 index 6f9113d73..000000000 --- a/rsocket-core/src/test/java/io/rsocket/frame/DataAndMetadataFlyweightTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package io.rsocket.frame; - -import io.netty.buffer.*; -import org.junit.jupiter.api.Test; - -class DataAndMetadataFlyweightTest { - @Test - void testEncodeData() { - ByteBuf header = FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 1, FrameType.PAYLOAD, 0); - ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm data_"); - ByteBuf frame = DataAndMetadataFlyweight.encodeOnlyData(ByteBufAllocator.DEFAULT, header, data); - ByteBuf d = DataAndMetadataFlyweight.data(frame, false); - String s = ByteBufUtil.prettyHexDump(d); - System.out.println(s); - } - - @Test - void testEncodeMetadata() { - ByteBuf header = FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 1, FrameType.PAYLOAD, 0); - ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm metadata_"); - ByteBuf frame = - DataAndMetadataFlyweight.encodeOnlyMetadata(ByteBufAllocator.DEFAULT, header, data); - ByteBuf d = DataAndMetadataFlyweight.data(frame, false); - String s = ByteBufUtil.prettyHexDump(d); - System.out.println(s); - } - - @Test - void testEncodeDataAndMetadata() { - ByteBuf header = - FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 1, FrameType.REQUEST_RESPONSE, 0); - ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm data_"); - ByteBuf metadata = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm metadata_"); - ByteBuf frame = - DataAndMetadataFlyweight.encode(ByteBufAllocator.DEFAULT, header, metadata, data); - ByteBuf m = DataAndMetadataFlyweight.metadata(frame, true); - String s = ByteBufUtil.prettyHexDump(m); - System.out.println(s); - FrameType frameType = FrameHeaderFlyweight.frameType(frame); - System.out.println(frameType); - - for (int i = 0; i < 10_000_000; i++) { - ByteBuf d1 = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm data_"); - ByteBuf m1 = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm metadata_"); - ByteBuf h1 = - FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 1, FrameType.REQUEST_RESPONSE, 0); - ByteBuf f1 = DataAndMetadataFlyweight.encode(ByteBufAllocator.DEFAULT, h1, m1, d1); - f1.release(); - } - } -} diff --git a/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java index e337d4332..eea72c03e 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java @@ -35,7 +35,7 @@ void extensionData() { Assertions.assertFalse(FrameHeaderFlyweight.hasMetadata(extension)); Assertions.assertEquals(extendedType, ExtensionFrameFlyweight.extendedType(extension)); - Assertions.assertEquals(0, ExtensionFrameFlyweight.metadata(extension).readableBytes()); + Assertions.assertNull(ExtensionFrameFlyweight.metadata(extension)); Assertions.assertEquals(data, ExtensionFrameFlyweight.data(extension)); extension.release(); } diff --git a/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java index 9ef89326a..e78adf9f1 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java @@ -31,7 +31,7 @@ void nextCompleteData() { String data = PayloadFrameFlyweight.data(nextComplete).toString(StandardCharsets.UTF_8); ByteBuf metadata = PayloadFrameFlyweight.metadata(nextComplete); Assertions.assertEquals("d", data); - Assertions.assertTrue(metadata.readableBytes() == 0); + Assertions.assertNull(metadata); nextComplete.release(); } @@ -68,7 +68,18 @@ void nextData() { String data = PayloadFrameFlyweight.data(next).toString(StandardCharsets.UTF_8); ByteBuf metadata = PayloadFrameFlyweight.metadata(next); Assertions.assertEquals("d", data); - Assertions.assertTrue(metadata.readableBytes() == 0); + Assertions.assertNull(metadata); + next.release(); + } + + @Test + void nextDataEmptyMetadata() { + Payload payload = DefaultPayload.create("d".getBytes(), new byte[0]); + ByteBuf next = PayloadFrameFlyweight.encodeNext(ByteBufAllocator.DEFAULT, 1, payload); + String data = PayloadFrameFlyweight.data(next).toString(StandardCharsets.UTF_8); + ByteBuf metadata = PayloadFrameFlyweight.metadata(next); + Assertions.assertEquals("d", data); + Assertions.assertEquals(metadata.readableBytes(), 0); next.release(); } } diff --git a/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java index a82c3e658..c19d4e1f4 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java @@ -22,13 +22,15 @@ void testEncoding() { Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - // Encoded Frame Length⌍ ⌌ Encoded Headers - // | | ⌌ Encoded Request(1) - // | | | ⌌Encoded Metadata - // | | | | ⌌Encoded Data - // __|________|_________|_________|_____| - // ↓ ↓↓ ↓↓ ↓↓ ↓↓↓ - assertEquals("000010000000011900000000010000026d6464", ByteBufUtil.hexDump(frame)); + // Encoded FrameLength⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Metadata Length + // | | | | ⌌Encoded Metadata + // | | | | | ⌌Encoded Data + // __|________|_________|______|____|___| + // ↓ ↓↓ ↓↓ ↓↓ ↓↓ ↓↓↓ + String expected = "000010000000011900000000010000026d6464"; + assertEquals(expected, ByteBufUtil.hexDump(frame)); frame.release(); } @@ -44,12 +46,14 @@ void testEncodingWithEmptyMetadata() { Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - // Encoded Frame Length⌍ ⌌ Encoded Headers - // | | ⌌ Encoded Request(1) - // | | | ⌌Encoded Data - // __|________|_________|_____| - // ↓ ↓↓ ↓↓ ↓↓↓ - assertEquals("00000b0000000118000000000164", ByteBufUtil.hexDump(frame)); + // Encoded FrameLength⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Metadata Length (0) + // | | | | ⌌Encoded Data + // __|________|_________|_______|___| + // ↓ ↓↓ ↓↓ ↓↓ ↓↓↓ + String expected = "00000e0000000119000000000100000064"; + assertEquals(expected, ByteBufUtil.hexDump(frame)); frame.release(); } @@ -66,12 +70,13 @@ void testEncodingWithNullMetadata() { frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - // Encoded Frame Length⌍ ⌌ Encoded Headers - // | | ⌌ Encoded Request(1) - // | | | ⌌Encoded Data - // __|________|_________|_____| - // ↓ ↓↓ ↓↓ ↓↓↓ - assertEquals("00000b0000000118000000000164", ByteBufUtil.hexDump(frame)); + // Encoded FrameLength⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Data + // __|________|_________|_____| + // ↓<-> ↓↓ <-> ↓↓ <-> ↓↓↓ + String expected = "00000b0000000118000000000164"; + assertEquals(expected, ByteBufUtil.hexDump(frame)); frame.release(); } @@ -110,7 +115,7 @@ void requestResponseData() { assertFalse(FrameHeaderFlyweight.hasMetadata(request)); assertEquals("d", data); - assertTrue(metadata.readableBytes() == 0); + assertNull(metadata); request.release(); } @@ -145,13 +150,13 @@ void requestStreamDataMetadata() { Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); - int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); + long actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); String data = RequestStreamFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); String metadata = RequestStreamFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); - assertEquals(Integer.MAX_VALUE, actualRequest); + assertEquals(Long.MAX_VALUE, actualRequest); assertEquals("md", metadata); assertEquals("d", data); request.release(); @@ -168,13 +173,13 @@ void requestStreamData() { null, Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); - int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); + long actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); String data = RequestStreamFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); ByteBuf metadata = RequestStreamFrameFlyweight.metadata(request); assertFalse(FrameHeaderFlyweight.hasMetadata(request)); - assertEquals(42, actualRequest); - assertTrue(metadata.readableBytes() == 0); + assertEquals(42L, actualRequest); + assertNull(metadata); assertEquals("d", data); request.release(); } @@ -190,13 +195,13 @@ void requestStreamMetadata() { Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), Unpooled.EMPTY_BUFFER); - int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); + long actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); ByteBuf data = RequestStreamFrameFlyweight.data(request); String metadata = RequestStreamFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); - assertEquals(42, actualRequest); + assertEquals(42L, actualRequest); assertTrue(data.readableBytes() == 0); assertEquals("md", metadata); request.release(); @@ -237,7 +242,7 @@ void requestFnfData() { assertFalse(FrameHeaderFlyweight.hasMetadata(request)); assertEquals("d", data); - assertTrue(metadata.readableBytes() == 0); + assertNull(metadata); request.release(); } diff --git a/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java index 09aa8f325..2ad944d09 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java @@ -25,11 +25,11 @@ public void shouldIndicateThatItHasNotMetadata() { } @Test - public void shouldIndicateThatItHasNotMetadata1() { + public void shouldIndicateThatItHasMetadata1() { Payload payload = ByteBufPayload.create(Unpooled.wrappedBuffer("data".getBytes()), Unpooled.EMPTY_BUFFER); - Assertions.assertThat(payload.hasMetadata()).isFalse(); + Assertions.assertThat(payload.hasMetadata()).isTrue(); Assertions.assertThat(payload.release()).isTrue(); } diff --git a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java index 7ebcd1a74..6bae0886b 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java @@ -60,18 +60,18 @@ public void shouldIndicateThatItHasNotMetadata() { } @Test - public void shouldIndicateThatItHasNotMetadata1() { + public void shouldIndicateThatItHasMetadata1() { Payload payload = DefaultPayload.create(Unpooled.wrappedBuffer("data".getBytes()), Unpooled.EMPTY_BUFFER); - Assertions.assertThat(payload.hasMetadata()).isFalse(); + Assertions.assertThat(payload.hasMetadata()).isTrue(); } @Test - public void shouldIndicateThatItHasNotMetadata2() { + public void shouldIndicateThatItHasMetadata2() { Payload payload = DefaultPayload.create(ByteBuffer.wrap("data".getBytes()), ByteBuffer.allocate(0)); - Assertions.assertThat(payload.hasMetadata()).isFalse(); + Assertions.assertThat(payload.hasMetadata()).isTrue(); } }