From c8c1f0c5146153a3ce59412dbba9711aad356dc8 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Wed, 22 Apr 2020 20:52:35 +0300 Subject: [PATCH 1/8] fixes incorrect request propagation Initially that issue was hidden because both sides uses limitRate which does prefetch 256 elements in advance so it is almost impossible to track underflow in request Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketResponder.java | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index f9c3d1c65..c41f99ff5 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -35,7 +35,6 @@ import io.rsocket.internal.UnboundedProcessor; import io.rsocket.lease.ResponderLeaseHandler; import java.util.function.Consumer; -import java.util.function.LongConsumer; import javax.annotation.Nullable; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; @@ -530,23 +529,7 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) { Flux payloads = frames .doOnRequest( - new LongConsumer() { - boolean first = true; - - @Override - public void accept(long l) { - long n; - if (first) { - first = false; - n = l - 1L; - } else { - n = l; - } - if (n > 0) { - sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n)); - } - } - }) + l -> sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, l))) .doFinally( signalType -> { if (channelProcessors.remove(streamId, frames)) { From 536596c06f8a3d77598df764f047a82fa04d972e Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 17 Apr 2020 16:20:36 +0300 Subject: [PATCH 2/8] fixes behaviour of flight-weights when it gets unreadable buffers ensures that ByteBufPayload is not accessible anymore when it has been released Signed-off-by: Oleh Dokuka --- .../frame/DataAndMetadataFlyweight.java | 7 ++ .../frame/ExtensionFrameFlyweight.java | 17 +++-- .../rsocket/frame/FragmentationFlyweight.java | 13 ++-- .../io/rsocket/frame/LeaseFrameFlyweight.java | 8 ++- .../io/rsocket/frame/RequestFlyweight.java | 18 ++++-- .../io/rsocket/frame/SetupFrameFlyweight.java | 21 +++--- .../java/io/rsocket/util/ByteBufPayload.java | 31 ++++++++- .../java/io/rsocket/util/DefaultPayload.java | 2 +- .../core/ConnectionSetupPayloadTest.java | 2 +- .../rsocket/frame/RequestFlyweightTest.java | 20 +++++- .../io/rsocket/util/ByteBufPayloadTest.java | 64 +++++++++++++++++++ .../io/rsocket/util/DefaultPayloadTest.java | 30 ++++++++- 12 files changed, 197 insertions(+), 36 deletions(-) create mode 100644 rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java index d910fe92f..5e9b15b16 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java @@ -47,6 +47,13 @@ static ByteBuf encode( return allocator.compositeBuffer(3).addComponents(true, header, metadata, data); } + static ByteBuf encode(ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) { + + int length = metadata.readableBytes(); + encodeLength(header, length); + return allocator.compositeBuffer(2).addComponents(true, header, metadata); + } + static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) { if (hasMetadata) { int length = decodeLength(byteBuf); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java index df8b308e9..d8efad72d 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java @@ -14,20 +14,25 @@ public static ByteBuf encode( @Nullable ByteBuf metadata, ByteBuf data) { + final boolean hasData = data != null && data.isReadable(); + final boolean hasMetadata = metadata != null && metadata.isReadable(); + int flags = FrameHeaderFlyweight.FLAGS_I; - if (metadata != null) { + if (hasMetadata) { flags |= FrameHeaderFlyweight.FLAGS_M; } - ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags); + final ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags); header.writeInt(extendedType); - if (data == null && metadata == null) { - return header; - } else if (metadata != null) { + if (hasData && hasMetadata) { return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else { + } else if (hasMetadata) { + return DataAndMetadataFlyweight.encode(allocator, header, metadata); + } else if (hasData) { return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); + } else { + return header; } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java index 06efeab6c..b1aa6ff99 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java @@ -13,12 +13,17 @@ public static ByteBuf encode(final ByteBufAllocator allocator, ByteBuf header, B public static ByteBuf encode( final ByteBufAllocator allocator, ByteBuf header, @Nullable ByteBuf metadata, ByteBuf data) { - if (data == null && metadata == null) { - return header; - } else if (metadata != null) { + final boolean hasData = data != null && data.isReadable(); + final boolean hasMetadata = metadata != null && metadata.isReadable(); + + if (hasData && hasMetadata) { return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else { + } else if (hasMetadata) { + return DataAndMetadataFlyweight.encode(allocator, header, metadata); + } else if (hasData) { return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); + } else { + return header; } } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java index 4676f4c9d..e49ed88d1 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java @@ -13,18 +13,20 @@ public static ByteBuf encode( final int numRequests, @Nullable final ByteBuf metadata) { + final boolean hasMetadata = metadata != null && metadata.isReadable(); + int flags = 0; - if (metadata != null) { + if (hasMetadata) { flags |= FrameHeaderFlyweight.FLAGS_M; } - ByteBuf header = + final ByteBuf header = FrameHeaderFlyweight.encodeStreamZero(allocator, FrameType.LEASE, flags) .writeInt(ttl) .writeInt(numRequests); - if (metadata == null) { + if (!hasMetadata) { return header; } else { return DataAndMetadataFlyweight.encodeOnlyMetadata(allocator, header, metadata); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java index 98d862f36..f0a99317a 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java @@ -29,9 +29,13 @@ ByteBuf encode( int requestN, @Nullable ByteBuf metadata, ByteBuf data) { + + final boolean hasData = data != null && data.isReadable(); + final boolean hasMetadata = metadata != null && metadata.isReadable(); + int flags = 0; - if (metadata != null) { + if (hasMetadata) { flags |= FrameHeaderFlyweight.FLAGS_M; } @@ -47,18 +51,20 @@ ByteBuf encode( flags |= FrameHeaderFlyweight.FLAGS_N; } - ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, frameType, flags); + final ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, frameType, flags); if (requestN > 0) { header.writeInt(requestN); } - if (data == null && metadata == null) { - return header; - } else if (metadata != null) { + if (hasData && hasMetadata) { return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else { + } else if (hasMetadata) { + return DataAndMetadataFlyweight.encode(allocator, header, metadata); + } else if (hasData) { return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); + } else { + return header; } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java index 9f92e715f..cfd0ca7bc 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java @@ -55,8 +55,10 @@ public static ByteBuf encode( final String dataMimeType, final Payload setupPayload) { - ByteBuf metadata = setupPayload.hasMetadata() ? setupPayload.sliceMetadata() : null; - ByteBuf data = setupPayload.sliceData(); + final ByteBuf data = setupPayload.sliceData(); + final boolean hasData = data.isReadable(); + final boolean hasMetadata = setupPayload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? setupPayload.sliceMetadata() : null; int flags = 0; @@ -68,11 +70,11 @@ public static ByteBuf encode( flags |= FLAGS_WILL_HONOR_LEASE; } - if (metadata != null) { + if (hasMetadata) { flags |= FrameHeaderFlyweight.FLAGS_M; } - ByteBuf header = FrameHeaderFlyweight.encodeStreamZero(allocator, FrameType.SETUP, flags); + final ByteBuf header = FrameHeaderFlyweight.encodeStreamZero(allocator, FrameType.SETUP, flags); header.writeInt(CURRENT_VERSION).writeInt(keepaliveInterval).writeInt(maxLifetime); @@ -91,12 +93,15 @@ public static ByteBuf encode( length = ByteBufUtil.utf8Bytes(dataMimeType); header.writeByte(length); ByteBufUtil.writeUtf8(header, dataMimeType); - if (data == null && metadata == null) { - return header; - } else if (metadata != null) { + + if (hasData && hasMetadata) { return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else { + } else if (hasMetadata) { + return DataAndMetadataFlyweight.encode(allocator, header, metadata); + } else if (hasData) { return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); + } else { + return header; } } diff --git a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java index b91cf8ac6..0eca6b8e4 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.util.AbstractReferenceCounted; +import io.netty.util.IllegalReferenceCountException; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.rsocket.Payload; @@ -112,9 +113,10 @@ public static Payload create(ByteBuf data) { public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) { ByteBufPayload payload = RECYCLER.get(); - payload.setRefCnt(1); payload.data = data; payload.metadata = metadata; + // unsure data and metadata is set before refCnt change + payload.setRefCnt(1); return payload; } @@ -126,26 +128,31 @@ public static Payload create(Payload payload) { @Override public boolean hasMetadata() { - return metadata != null; + ensureAccessible(); + return metadata != null && metadata.isReadable(); } @Override public ByteBuf sliceMetadata() { + ensureAccessible(); return metadata == null ? Unpooled.EMPTY_BUFFER : metadata.slice(); } @Override public ByteBuf data() { + ensureAccessible(); return data; } @Override public ByteBuf metadata() { + ensureAccessible(); return metadata == null ? Unpooled.EMPTY_BUFFER : metadata; } @Override public ByteBuf sliceData() { + ensureAccessible(); return data.slice(); } @@ -163,6 +170,7 @@ public ByteBufPayload retain(int increment) { @Override public ByteBufPayload touch() { + ensureAccessible(); data.touch(); if (metadata != null) { metadata.touch(); @@ -172,6 +180,7 @@ public ByteBufPayload touch() { @Override public ByteBufPayload touch(Object hint) { + ensureAccessible(); data.touch(hint); if (metadata != null) { metadata.touch(hint); @@ -189,4 +198,22 @@ protected void deallocate() { } handle.recycle(this); } + + /** + * Should be called by every method that tries to access the buffers content to check if the + * buffer was released before. + */ + void ensureAccessible() { + if (!isAccessible()) { + throw new IllegalReferenceCountException(0); + } + } + + /** + * Used internally by {@link ByteBufPayload#ensureAccessible()} to try to guard against using the + * buffer after it was released (best-effort). + */ + boolean isAccessible() { + return refCnt() != 0; + } } diff --git a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java index ec73399f1..765073dd7 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java @@ -110,7 +110,7 @@ public static Payload create(Payload payload) { @Override public boolean hasMetadata() { - return metadata != null; + return metadata != null && metadata.remaining() > 0; } @Override diff --git a/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java index ea3142d25..90bb6e998 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java @@ -69,7 +69,7 @@ void testSetupPayloadWithEmptyMetadata() { ConnectionSetupPayload setupPayload = new DefaultConnectionSetupPayload(frame); assertFalse(setupPayload.willClientHonorLease()); - assertTrue(setupPayload.hasMetadata()); + assertFalse(setupPayload.hasMetadata()); assertNotNull(setupPayload.metadata()); assertEquals(0, setupPayload.metadata().readableBytes()); assertEquals(payload.data(), setupPayload.data()); diff --git a/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java index 9acec2c81..a82c3e658 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java @@ -22,7 +22,12 @@ void testEncoding() { Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - + // Encoded Frame Length⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Metadata + // | | | | ⌌Encoded Data + // __|________|_________|_________|_____| + // ↓ ↓↓ ↓↓ ↓↓ ↓↓↓ assertEquals("000010000000011900000000010000026d6464", ByteBufUtil.hexDump(frame)); frame.release(); } @@ -39,8 +44,12 @@ void testEncodingWithEmptyMetadata() { Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - - assertEquals("00000e0000000119000000000100000064", ByteBufUtil.hexDump(frame)); + // Encoded Frame Length⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Data + // __|________|_________|_____| + // ↓ ↓↓ ↓↓ ↓↓↓ + assertEquals("00000b0000000118000000000164", ByteBufUtil.hexDump(frame)); frame.release(); } @@ -57,6 +66,11 @@ void testEncodingWithNullMetadata() { frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); + // Encoded Frame Length⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Data + // __|________|_________|_____| + // ↓ ↓↓ ↓↓ ↓↓↓ assertEquals("00000b0000000118000000000164", ByteBufUtil.hexDump(frame)); frame.release(); } diff --git a/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java new file mode 100644 index 000000000..09aa8f325 --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java @@ -0,0 +1,64 @@ +package io.rsocket.util; + +import io.netty.buffer.Unpooled; +import io.netty.util.IllegalReferenceCountException; +import io.rsocket.Payload; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ByteBufPayloadTest { + + @Test + public void shouldIndicateThatItHasMetadata() { + Payload payload = ByteBufPayload.create("data", "metadata"); + + Assertions.assertThat(payload.hasMetadata()).isTrue(); + Assertions.assertThat(payload.release()).isTrue(); + } + + @Test + public void shouldIndicateThatItHasNotMetadata() { + Payload payload = ByteBufPayload.create("data"); + + Assertions.assertThat(payload.hasMetadata()).isFalse(); + Assertions.assertThat(payload.release()).isTrue(); + } + + @Test + public void shouldIndicateThatItHasNotMetadata1() { + Payload payload = + ByteBufPayload.create(Unpooled.wrappedBuffer("data".getBytes()), Unpooled.EMPTY_BUFFER); + + Assertions.assertThat(payload.hasMetadata()).isFalse(); + Assertions.assertThat(payload.release()).isTrue(); + } + + @Test + public void shouldThrowExceptionIfAccessAfterRelease() { + Payload payload = ByteBufPayload.create("data", "metadata"); + + Assertions.assertThat(payload.release()).isTrue(); + + Assertions.assertThatThrownBy(payload::hasMetadata) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::data).isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::metadata) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::sliceData) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::sliceMetadata) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::touch) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(() -> payload.touch("test")) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::getData) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::getMetadata) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::getDataUtf8) + .isInstanceOf(IllegalReferenceCountException.class); + Assertions.assertThatThrownBy(payload::getMetadataUtf8) + .isInstanceOf(IllegalReferenceCountException.class); + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java index 45ee4eacb..7ebcd1a74 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java @@ -16,10 +16,13 @@ package io.rsocket.util; -import static org.hamcrest.MatcherAssert.*; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import io.netty.buffer.Unpooled; import io.rsocket.Payload; +import java.nio.ByteBuffer; +import org.assertj.core.api.Assertions; import org.junit.Test; public class DefaultPayloadTest { @@ -48,4 +51,27 @@ public void staticMethods() { assertDataAndMetadata(DefaultPayload.create(DATA_VAL, METADATA_VAL), DATA_VAL, METADATA_VAL); assertDataAndMetadata(DefaultPayload.create(DATA_VAL), DATA_VAL, null); } + + @Test + public void shouldIndicateThatItHasNotMetadata() { + Payload payload = DefaultPayload.create("data"); + + Assertions.assertThat(payload.hasMetadata()).isFalse(); + } + + @Test + public void shouldIndicateThatItHasNotMetadata1() { + Payload payload = + DefaultPayload.create(Unpooled.wrappedBuffer("data".getBytes()), Unpooled.EMPTY_BUFFER); + + Assertions.assertThat(payload.hasMetadata()).isFalse(); + } + + @Test + public void shouldIndicateThatItHasNotMetadata2() { + Payload payload = + DefaultPayload.create(ByteBuffer.wrap("data".getBytes()), ByteBuffer.allocate(0)); + + Assertions.assertThat(payload.hasMetadata()).isFalse(); + } } From c93c456fe7c739f189763ec3af0fc20bd61ad32e Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 21 Apr 2020 00:31:48 +0300 Subject: [PATCH 3/8] partial Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/frame/DataAndMetadataFlyweight.java | 5 ----- .../main/java/io/rsocket/frame/LeaseFrameFlyweight.java | 9 +++++---- .../src/main/java/io/rsocket/util/ByteBufPayload.java | 2 +- .../src/main/java/io/rsocket/util/DefaultPayload.java | 2 +- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java index 5e9b15b16..41e74b469 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java @@ -30,11 +30,6 @@ private static int decodeLength(final ByteBuf byteBuf) { return length; } - static ByteBuf encodeOnlyMetadata( - ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) { - return allocator.compositeBuffer(2).addComponents(true, header, metadata); - } - static ByteBuf encodeOnlyData(ByteBufAllocator allocator, final ByteBuf header, ByteBuf data) { return allocator.compositeBuffer(2).addComponents(true, header, data); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java index e49ed88d1..96737d7b3 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java @@ -13,7 +13,8 @@ public static ByteBuf encode( final int numRequests, @Nullable final ByteBuf metadata) { - final boolean hasMetadata = metadata != null && metadata.isReadable(); + final boolean hasMetadata = metadata != null; + final boolean writesMetadata = hasMetadata && metadata.isReadable(); int flags = 0; @@ -26,10 +27,10 @@ public static ByteBuf encode( .writeInt(ttl) .writeInt(numRequests); - if (!hasMetadata) { - return header; + if (writesMetadata) { + return allocator.compositeBuffer(2).addComponents(true, header, metadata); } else { - return DataAndMetadataFlyweight.encodeOnlyMetadata(allocator, header, metadata); + return header; } } diff --git a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java index 0eca6b8e4..f5d747f7f 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java @@ -129,7 +129,7 @@ public static Payload create(Payload payload) { @Override public boolean hasMetadata() { ensureAccessible(); - return metadata != null && metadata.isReadable(); + return metadata != null; } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java index 765073dd7..ec73399f1 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java @@ -110,7 +110,7 @@ public static Payload create(Payload payload) { @Override public boolean hasMetadata() { - return metadata != null && metadata.remaining() > 0; + return metadata != null; } @Override From 6f74ae9452625dc878437b3dd394a17ddf64d03a Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Wed, 22 Apr 2020 20:28:47 +0300 Subject: [PATCH 4/8] provides refactoring related to ensuring that hasMetadata is propagated correctly Right now there was observed a few issues related to that Payload with no metadata was incorrectly incoded so it results to hasMetadata true on the received side Also, optimized the API of Flyweights to ensure that we have common things in one place Signed-off-by: Oleh Dokuka --- .../core/DefaultConnectionSetupPayload.java | 4 +- .../io/rsocket/core/RSocketRequester.java | 39 +---- .../io/rsocket/core/RSocketResponder.java | 33 ++++- .../fragmentation/FrameReassembler.java | 16 +- .../frame/DataAndMetadataFlyweight.java | 55 +++---- .../frame/ExtensionFrameFlyweight.java | 19 +-- .../rsocket/frame/FragmentationFlyweight.java | 14 +- .../frame/KeepAliveFrameFlyweight.java | 2 +- .../io/rsocket/frame/LeaseFrameFlyweight.java | 4 +- .../rsocket/frame/PayloadFrameFlyweight.java | 47 ++---- .../frame/RequestChannelFrameFlyweight.java | 30 ++-- .../RequestFireAndForgetFrameFlyweight.java | 10 +- .../io/rsocket/frame/RequestFlyweight.java | 23 ++- .../rsocket/frame/RequestNFrameFlyweight.java | 15 +- .../frame/RequestResponseFrameFlyweight.java | 10 +- .../frame/RequestStreamFrameFlyweight.java | 47 +++--- .../io/rsocket/frame/SetupFrameFlyweight.java | 16 +- .../frame/decoder/DefaultPayloadDecoder.java | 25 +++- .../frame/decoder/ZeroCopyPayloadDecoder.java | 11 +- .../core/ConnectionSetupPayloadTest.java | 2 +- .../io/rsocket/core/RSocketRequesterTest.java | 101 +++++++++++++ .../io/rsocket/core/RSocketResponderTest.java | 140 +++++++++++++++--- .../fragmentation/FrameFragmenterTest.java | 55 +++---- .../fragmentation/FrameReassemblerTest.java | 112 +++++++------- .../ReassembleDuplexConnectionTest.java | 48 +++--- .../frame/DataAndMetadataFlyweightTest.java | 51 ------- .../frame/ExtensionFrameFlyweightTest.java | 2 +- .../rsocket/frame/PayloadFlyweightTest.java | 15 +- .../rsocket/frame/RequestFlyweightTest.java | 61 ++++---- .../io/rsocket/util/ByteBufPayloadTest.java | 4 +- .../io/rsocket/util/DefaultPayloadTest.java | 8 +- 31 files changed, 572 insertions(+), 447 deletions(-) delete mode 100644 rsocket-core/src/test/java/io/rsocket/frame/DataAndMetadataFlyweightTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java b/rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java index 8710aa61a..feeb5c481 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java @@ -17,6 +17,7 @@ package io.rsocket.core; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.rsocket.ConnectionSetupPayload; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.SetupFrameFlyweight; @@ -40,7 +41,8 @@ public boolean hasMetadata() { @Override public ByteBuf sliceMetadata() { - return SetupFrameFlyweight.metadata(setupFrame); + final ByteBuf metadata = SetupFrameFlyweight.metadata(setupFrame); + return metadata == null ? Unpooled.EMPTY_BUFFER : metadata; } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 4d7d47f6a..c3cbbdfa6 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -212,12 +212,7 @@ private Mono handleFireAndForget(Payload payload) { return UnicastMonoEmpty.newInstance( () -> { ByteBuf requestFrame = - RequestFireAndForgetFrameFlyweight.encode( - allocator, - streamId, - false, - payload.hasMetadata() ? payload.sliceMetadata().retain() : null, - payload.sliceData().retain()); + RequestFireAndForgetFrameFlyweight.encode(allocator, streamId, payload); payload.release(); sendProcessor.onNext(requestFrame); @@ -245,12 +240,7 @@ private Mono handleRequestResponse(final Payload payload) { @Override public void doOnSubscribe() { final ByteBuf requestFrame = - RequestResponseFrameFlyweight.encode( - allocator, - streamId, - false, - payload.sliceMetadata().retain(), - payload.sliceData().retain()); + RequestResponseFrameFlyweight.encode(allocator, streamId, payload); payload.release(); sendProcessor.onNext(requestFrame); @@ -302,15 +292,9 @@ private Flux handleRequestStream(final Payload payload) { public void accept(long n) { if (firstRequest && !receiver.isDisposed()) { firstRequest = false; - sendProcessor.onNext( - RequestStreamFrameFlyweight.encode( - allocator, - streamId, - false, - n, - payload.sliceMetadata().retain(), - payload.sliceData().retain())); if (!payloadReleasedFlag.getAndSet(true)) { + sendProcessor.onNext( + RequestStreamFrameFlyweight.encode(allocator, streamId, n, payload)); payload.release(); } } else if (contains(streamId) && !receiver.isDisposed()) { @@ -399,8 +383,7 @@ protected void hookOnNext(Payload payload) { receiver.onError(t); return; } - final ByteBuf frame = - PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, payload); + final ByteBuf frame = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload); sendProcessor.onNext(frame); payload.release(); @@ -445,13 +428,7 @@ public void accept(long n) { if (!payloadReleasedFlag.getAndSet(true)) { ByteBuf frame = RequestChannelFrameFlyweight.encode( - allocator, - streamId, - false, - false, - n, - initialPayload.sliceMetadata().retain(), - initialPayload.sliceData().retain()); + allocator, streamId, false, n, initialPayload); sendProcessor.onNext(frame); @@ -604,8 +581,8 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) { { Subscription sender = senders.get(streamId); if (sender != null) { - int n = RequestNFrameFlyweight.requestN(frame); - sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n); + long n = RequestNFrameFlyweight.requestN(frame); + sender.request(n); } break; } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index c41f99ff5..3860b8dae 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -35,6 +35,7 @@ import io.rsocket.internal.UnboundedProcessor; import io.rsocket.lease.ResponderLeaseHandler; import java.util.function.Consumer; +import java.util.function.LongConsumer; import javax.annotation.Nullable; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; @@ -300,12 +301,12 @@ private void handleFrame(ByteBuf frame) { handleRequestN(streamId, frame); break; case REQUEST_STREAM: - int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame); + long streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame); Payload streamPayload = payloadDecoder.apply(frame); handleStream(streamId, requestStream(streamPayload), streamInitialRequestN, null); break; case REQUEST_CHANNEL: - int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame); + long channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame); Payload channelPayload = payloadDecoder.apply(frame); handleChannel(streamId, channelPayload, channelInitialRequestN); break; @@ -436,14 +437,14 @@ protected void hookFinally(SignalType type) { private void handleStream( int streamId, Flux response, - int initialRequestN, + long initialRequestN, @Nullable UnicastProcessor requestChannel) { final BaseSubscriber subscriber = new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription s) { - s.request(initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN); + s.request(initialRequestN); } @Override @@ -522,14 +523,30 @@ protected void hookFinally(SignalType type) { .subscribe(subscriber); } - private void handleChannel(int streamId, Payload payload, int initialRequestN) { + private void handleChannel(int streamId, Payload payload, long initialRequestN) { UnicastProcessor frames = UnicastProcessor.create(); channelProcessors.put(streamId, frames); Flux payloads = frames .doOnRequest( - l -> sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, l))) + new LongConsumer() { + boolean first = true; + + @Override + public void accept(long l) { + long n; + if (first) { + first = false; + n = l - 1L; + } else { + n = l; + } + if (n > 0) { + sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n)); + } + } + }) .doFinally( signalType -> { if (channelProcessors.remove(streamId, frames)) { @@ -585,8 +602,8 @@ private void handleRequestN(int streamId, ByteBuf frame) { Subscription subscription = sendingSubscriptions.get(streamId); if (subscription != null) { - int n = RequestNFrameFlyweight.requestN(frame); - subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n); + long n = RequestNFrameFlyweight.requestN(frame); + subscription.request(n); } } } diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java index d8537ec1a..1a8d242b2 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java @@ -166,8 +166,8 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) { header = frame.copy(frame.readerIndex(), FrameHeaderFlyweight.size()); if (frameType == FrameType.REQUEST_CHANNEL || frameType == FrameType.REQUEST_STREAM) { - int i = RequestChannelFrameFlyweight.initialRequestN(frame); - header.writeInt(i); + long i = RequestChannelFrameFlyweight.initialRequestN(frame); + header.writeInt(i > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) i); } putHeader(streamId, header); } @@ -261,10 +261,16 @@ void reassembleFrame(ByteBuf frame, SynchronousSink sink) { private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf header) { ByteBuf metadata; CompositeByteBuf cm = removeMetadata(streamId); - if (cm != null) { - metadata = cm.addComponents(true, PayloadFrameFlyweight.metadata(frame).retain()); + + ByteBuf decodedMetadata = PayloadFrameFlyweight.metadata(frame); + if (decodedMetadata != null) { + if (cm != null) { + metadata = cm.addComponents(true, decodedMetadata.retain()); + } else { + metadata = PayloadFrameFlyweight.metadata(frame).retain(); + } } else { - metadata = PayloadFrameFlyweight.metadata(frame).retain(); + metadata = cm != null ? cm : null; } ByteBuf data = assembleData(frame, streamId); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java index 41e74b469..ea54aa374 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java @@ -30,40 +30,35 @@ private static int decodeLength(final ByteBuf byteBuf) { return length; } - static ByteBuf encodeOnlyData(ByteBufAllocator allocator, final ByteBuf header, ByteBuf data) { - return allocator.compositeBuffer(2).addComponents(true, header, data); - } - static ByteBuf encode( - ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata, ByteBuf data) { - - int length = metadata.readableBytes(); - encodeLength(header, length); - return allocator.compositeBuffer(3).addComponents(true, header, metadata, data); - } - - static ByteBuf encode(ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) { + ByteBufAllocator allocator, + final ByteBuf header, + ByteBuf metadata, + boolean hasMetadata, + ByteBuf data) { - int length = metadata.readableBytes(); - encodeLength(header, length); - return allocator.compositeBuffer(2).addComponents(true, header, metadata); - } + final boolean addData = data != null && data.isReadable(); + final boolean addMetadata = hasMetadata && metadata.isReadable(); - static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) { if (hasMetadata) { - int length = decodeLength(byteBuf); - return byteBuf.readSlice(length); + int length = metadata.readableBytes(); + encodeLength(header, length); + } + + if (addMetadata && addData) { + return allocator.compositeBuffer(3).addComponents(true, header, metadata, data); + } else if (addMetadata) { + return allocator.compositeBuffer(2).addComponents(true, header, metadata); + } else if (addData) { + return allocator.compositeBuffer(2).addComponents(true, header, data); } else { - return Unpooled.EMPTY_BUFFER; + return header; } } - static ByteBuf metadata(ByteBuf byteBuf, boolean hasMetadata) { - byteBuf.markReaderIndex(); - byteBuf.skipBytes(6); - ByteBuf metadata = metadataWithoutMarking(byteBuf, hasMetadata); - byteBuf.resetReaderIndex(); - return metadata; + static ByteBuf metadataWithoutMarking(ByteBuf byteBuf) { + int length = decodeLength(byteBuf); + return byteBuf.readSlice(length); } static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) { @@ -78,12 +73,4 @@ static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) { return Unpooled.EMPTY_BUFFER; } } - - static ByteBuf data(ByteBuf byteBuf, boolean hasMetadata) { - byteBuf.markReaderIndex(); - byteBuf.skipBytes(6); - ByteBuf data = dataWithoutMarking(byteBuf, hasMetadata); - byteBuf.resetReaderIndex(); - return data; - } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java index d8efad72d..8cb01b08f 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java @@ -14,8 +14,7 @@ public static ByteBuf encode( @Nullable ByteBuf metadata, ByteBuf data) { - final boolean hasData = data != null && data.isReadable(); - final boolean hasMetadata = metadata != null && metadata.isReadable(); + final boolean hasMetadata = metadata != null; int flags = FrameHeaderFlyweight.FLAGS_I; @@ -25,15 +24,8 @@ public static ByteBuf encode( final ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags); header.writeInt(extendedType); - if (hasData && hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else if (hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata); - } else if (hasData) { - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); - } else { - return header; - } + + return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data); } public static int extendedType(ByteBuf byteBuf) { @@ -61,10 +53,13 @@ public static ByteBuf metadata(ByteBuf byteBuf) { FrameHeaderFlyweight.ensureFrameType(FrameType.EXT, byteBuf); boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); + if (!hasMetadata) { + return null; + } byteBuf.markReaderIndex(); // Extended type byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES); - ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); + ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf); byteBuf.resetReaderIndex(); return metadata; } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java index b1aa6ff99..a91d52782 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java @@ -13,17 +13,7 @@ public static ByteBuf encode(final ByteBufAllocator allocator, ByteBuf header, B public static ByteBuf encode( final ByteBufAllocator allocator, ByteBuf header, @Nullable ByteBuf metadata, ByteBuf data) { - final boolean hasData = data != null && data.isReadable(); - final boolean hasMetadata = metadata != null && metadata.isReadable(); - - if (hasData && hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else if (hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata); - } else if (hasData) { - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); - } else { - return header; - } + final boolean hasMetadata = metadata != null; + return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data); } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java index e4e6029b3..b591412a6 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java @@ -29,7 +29,7 @@ public static ByteBuf encode( header.writeLong(lp); - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); + return DataAndMetadataFlyweight.encode(allocator, header, null, false, data); } public static boolean respondFlag(ByteBuf byteBuf) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java index 96737d7b3..039c72886 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java @@ -14,7 +14,7 @@ public static ByteBuf encode( @Nullable final ByteBuf metadata) { final boolean hasMetadata = metadata != null; - final boolean writesMetadata = hasMetadata && metadata.isReadable(); + final boolean addMetadata = hasMetadata && metadata.isReadable(); int flags = 0; @@ -27,7 +27,7 @@ public static ByteBuf encode( .writeInt(ttl) .writeInt(numRequests); - if (writesMetadata) { + if (addMetadata) { return allocator.compositeBuffer(2).addComponents(true, header, metadata); } else { return header; diff --git a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java index 4f67d9c72..1b5204036 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java @@ -21,47 +21,24 @@ public static ByteBuf encode( allocator, streamId, fragmentFollows, complete, next, 0, metadata, data); } - public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - boolean complete, - boolean next, - Payload payload) { - return FLYWEIGHT.encode( - allocator, - streamId, - fragmentFollows, - complete, - next, - 0, - payload.hasMetadata() ? payload.metadata().retain() : null, - payload.data().retain()); + public static ByteBuf encodeNext(ByteBufAllocator allocator, int streamId, Payload payload) { + return encode(allocator, streamId, false, payload); } public static ByteBuf encodeNextComplete( ByteBufAllocator allocator, int streamId, Payload payload) { - return FLYWEIGHT.encode( - allocator, - streamId, - false, - true, - true, - 0, - payload.hasMetadata() ? payload.metadata().retain() : null, - payload.data().retain()); + + return encode(allocator, streamId, true, payload); } - public static ByteBuf encodeNext(ByteBufAllocator allocator, int streamId, Payload payload) { - return FLYWEIGHT.encode( - allocator, - streamId, - false, - false, - true, - 0, - payload.hasMetadata() ? payload.metadata().retain() : null, - payload.data().retain()); + static ByteBuf encode( + ByteBufAllocator allocator, int streamId, boolean complete, Payload payload) { + + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); + + return FLYWEIGHT.encode(allocator, streamId, false, complete, true, 0, metadata, data); } public static ByteBuf encodeComplete(ByteBufAllocator allocator, int streamId) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java index 06ddcda03..2a644b07a 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java @@ -13,22 +13,15 @@ private RequestChannelFrameFlyweight() {} public static ByteBuf encode( ByteBufAllocator allocator, int streamId, - boolean fragmentFollows, boolean complete, - long requestN, + long initialRequestN, Payload payload) { - int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); - return FLYWEIGHT.encode( - allocator, - streamId, - fragmentFollows, - complete, - false, - reqN, - payload.metadata(), - payload.data()); + return encode(allocator, streamId, false, complete, initialRequestN, metadata, data); } public static ByteBuf encode( @@ -36,11 +29,15 @@ public static ByteBuf encode( int streamId, boolean fragmentFollows, boolean complete, - long requestN, + long initialRequestN, ByteBuf metadata, ByteBuf data) { - int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; + if (initialRequestN < 1) { + throw new IllegalArgumentException("request n is less than 1"); + } + + int reqN = initialRequestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) initialRequestN; return FLYWEIGHT.encode( allocator, streamId, fragmentFollows, complete, false, reqN, metadata, data); @@ -54,7 +51,8 @@ public static ByteBuf metadata(ByteBuf byteBuf) { return FLYWEIGHT.metadataWithRequestN(byteBuf); } - public static int initialRequestN(ByteBuf byteBuf) { - return FLYWEIGHT.initialRequestN(byteBuf); + public static long initialRequestN(ByteBuf byteBuf) { + int requestN = FLYWEIGHT.initialRequestN(byteBuf); + return requestN == Integer.MAX_VALUE ? Long.MAX_VALUE : requestN; } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java index 5f2d606e4..13b7d907d 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java @@ -10,11 +10,13 @@ public class RequestFireAndForgetFrameFlyweight { private RequestFireAndForgetFrameFlyweight() {} - public static ByteBuf encode( - ByteBufAllocator allocator, int streamId, boolean fragmentFollows, Payload payload) { + public static ByteBuf encode(ByteBufAllocator allocator, int streamId, Payload payload) { + + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); - return FLYWEIGHT.encode( - allocator, streamId, fragmentFollows, payload.metadata(), payload.data()); + return FLYWEIGHT.encode(allocator, streamId, false, metadata, data); } public static ByteBuf encode( diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java index f0a99317a..15fac9f55 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java @@ -30,8 +30,7 @@ ByteBuf encode( @Nullable ByteBuf metadata, ByteBuf data) { - final boolean hasData = data != null && data.isReadable(); - final boolean hasMetadata = metadata != null && metadata.isReadable(); + final boolean hasMetadata = metadata != null; int flags = 0; @@ -57,15 +56,7 @@ ByteBuf encode( header.writeInt(requestN); } - if (hasData && hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else if (hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata); - } else if (hasData) { - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); - } else { - return header; - } + return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data); } ByteBuf data(ByteBuf byteBuf) { @@ -79,9 +70,12 @@ ByteBuf data(ByteBuf byteBuf) { ByteBuf metadata(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); + if (!hasMetadata) { + return null; + } byteBuf.markReaderIndex(); byteBuf.skipBytes(FrameHeaderFlyweight.size()); - ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); + ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf); byteBuf.resetReaderIndex(); return metadata; } @@ -97,9 +91,12 @@ ByteBuf dataWithRequestN(ByteBuf byteBuf) { ByteBuf metadataWithRequestN(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); + if (!hasMetadata) { + return null; + } byteBuf.markReaderIndex(); byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES); - ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); + ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf); byteBuf.resetReaderIndex(); return metadata; } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameFlyweight.java index 5a4c4c273..fe2c752cf 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestNFrameFlyweight.java @@ -8,26 +8,23 @@ private RequestNFrameFlyweight() {} public static ByteBuf encode( final ByteBufAllocator allocator, final int streamId, long requestN) { - int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; - return encode(allocator, streamId, reqN); - } - - public static ByteBuf encode(final ByteBufAllocator allocator, final int streamId, int requestN) { - ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.REQUEST_N, 0); if (requestN < 1) { throw new IllegalArgumentException("request n is less than 1"); } - return header.writeInt(requestN); + int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; + + ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.REQUEST_N, 0); + return header.writeInt(reqN); } - public static int requestN(ByteBuf byteBuf) { + public static long requestN(ByteBuf byteBuf) { FrameHeaderFlyweight.ensureFrameType(FrameType.REQUEST_N, byteBuf); byteBuf.markReaderIndex(); byteBuf.skipBytes(FrameHeaderFlyweight.size()); int i = byteBuf.readInt(); byteBuf.resetReaderIndex(); - return i; + return i == Integer.MAX_VALUE ? Long.MAX_VALUE : i; } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java index 2e06c9b82..d328c7fa2 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java @@ -10,9 +10,13 @@ public class RequestResponseFrameFlyweight { private RequestResponseFrameFlyweight() {} - public static ByteBuf encode( - ByteBufAllocator allocator, int streamId, boolean fragmentFollows, Payload payload) { - return encode(allocator, streamId, fragmentFollows, payload.metadata(), payload.data()); + public static ByteBuf encode(ByteBufAllocator allocator, int streamId, Payload payload) { + + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); + + return encode(allocator, streamId, false, metadata, data); } public static ByteBuf encode( diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java index 171c41990..1c06d80c7 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java @@ -11,45 +11,31 @@ public class RequestStreamFrameFlyweight { private RequestStreamFrameFlyweight() {} public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - long requestN, - Payload payload) { - return encode( - allocator, streamId, fragmentFollows, requestN, payload.metadata(), payload.data()); - } + ByteBufAllocator allocator, int streamId, long initialRequestN, Payload payload) { - public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - int requestN, - Payload payload) { - return encode( - allocator, streamId, fragmentFollows, requestN, payload.metadata(), payload.data()); - } + final boolean hasMetadata = payload.hasMetadata(); + final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; + final ByteBuf data = payload.data().retain(); - public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - long requestN, - ByteBuf metadata, - ByteBuf data) { - int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; - return encode(allocator, streamId, fragmentFollows, reqN, metadata, data); + return encode(allocator, streamId, false, initialRequestN, metadata, data); } public static ByteBuf encode( ByteBufAllocator allocator, int streamId, boolean fragmentFollows, - int requestN, + long initialRequestN, ByteBuf metadata, ByteBuf data) { + + if (initialRequestN < 1) { + throw new IllegalArgumentException("request n is less than 1"); + } + + int reqN = initialRequestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) initialRequestN; + return FLYWEIGHT.encode( - allocator, streamId, fragmentFollows, false, false, requestN, metadata, data); + allocator, streamId, fragmentFollows, false, false, reqN, metadata, data); } public static ByteBuf data(ByteBuf byteBuf) { @@ -60,7 +46,8 @@ public static ByteBuf metadata(ByteBuf byteBuf) { return FLYWEIGHT.metadataWithRequestN(byteBuf); } - public static int initialRequestN(ByteBuf byteBuf) { - return FLYWEIGHT.initialRequestN(byteBuf); + public static long initialRequestN(ByteBuf byteBuf) { + int requestN = FLYWEIGHT.initialRequestN(byteBuf); + return requestN == Integer.MAX_VALUE ? Long.MAX_VALUE : requestN; } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java index cfd0ca7bc..bfb73fe22 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java @@ -56,7 +56,6 @@ public static ByteBuf encode( final Payload setupPayload) { final ByteBuf data = setupPayload.sliceData(); - final boolean hasData = data.isReadable(); final boolean hasMetadata = setupPayload.hasMetadata(); final ByteBuf metadata = hasMetadata ? setupPayload.sliceMetadata() : null; @@ -94,15 +93,7 @@ public static ByteBuf encode( header.writeByte(length); ByteBufUtil.writeUtf8(header, dataMimeType); - if (hasData && hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata, data); - } else if (hasMetadata) { - return DataAndMetadataFlyweight.encode(allocator, header, metadata); - } else if (hasData) { - return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data); - } else { - return header; - } + return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data); } public static int version(ByteBuf byteBuf) { @@ -197,9 +188,12 @@ public static String dataMimeType(ByteBuf byteBuf) { public static ByteBuf metadata(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); + if (!hasMetadata) { + return null; + } byteBuf.markReaderIndex(); skipToPayload(byteBuf); - ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); + ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf); byteBuf.resetReaderIndex(); return metadata; } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java b/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java index 692dcb363..0a77e3820 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java @@ -3,8 +3,15 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.rsocket.Payload; -import io.rsocket.frame.*; -import io.rsocket.util.ByteBufPayload; +import io.rsocket.frame.FrameHeaderFlyweight; +import io.rsocket.frame.FrameType; +import io.rsocket.frame.MetadataPushFrameFlyweight; +import io.rsocket.frame.PayloadFrameFlyweight; +import io.rsocket.frame.RequestChannelFrameFlyweight; +import io.rsocket.frame.RequestFireAndForgetFrameFlyweight; +import io.rsocket.frame.RequestResponseFrameFlyweight; +import io.rsocket.frame.RequestStreamFrameFlyweight; +import io.rsocket.util.DefaultPayload; import java.nio.ByteBuffer; /** Default Frame decoder that copies the frames contents for easy of use. */ @@ -45,14 +52,18 @@ public Payload apply(ByteBuf byteBuf) { throw new IllegalArgumentException("unsupported frame type: " + type); } - ByteBuffer metadata = ByteBuffer.allocateDirect(m.readableBytes()); ByteBuffer data = ByteBuffer.allocateDirect(d.readableBytes()); - data.put(d.nioBuffer()); data.flip(); - metadata.put(m.nioBuffer()); - metadata.flip(); - return ByteBufPayload.create(data, metadata); + if (m != null) { + ByteBuffer metadata = ByteBuffer.allocateDirect(m.readableBytes()); + metadata.put(m.nioBuffer()); + metadata.flip(); + + return DefaultPayload.create(data, metadata); + } + + return DefaultPayload.create(data); } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java b/rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java index 0b63590e8..c92f82428 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java @@ -3,7 +3,14 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.rsocket.Payload; -import io.rsocket.frame.*; +import io.rsocket.frame.FrameHeaderFlyweight; +import io.rsocket.frame.FrameType; +import io.rsocket.frame.MetadataPushFrameFlyweight; +import io.rsocket.frame.PayloadFrameFlyweight; +import io.rsocket.frame.RequestChannelFrameFlyweight; +import io.rsocket.frame.RequestFireAndForgetFrameFlyweight; +import io.rsocket.frame.RequestResponseFrameFlyweight; +import io.rsocket.frame.RequestStreamFrameFlyweight; import io.rsocket.util.ByteBufPayload; /** @@ -46,6 +53,6 @@ public Payload apply(ByteBuf byteBuf) { throw new IllegalArgumentException("unsupported frame type: " + type); } - return ByteBufPayload.create(d.retain(), m.retain()); + return ByteBufPayload.create(d.retain(), m != null ? m.retain() : null); } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java index 90bb6e998..ea3142d25 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java @@ -69,7 +69,7 @@ void testSetupPayloadWithEmptyMetadata() { ConnectionSetupPayload setupPayload = new DefaultConnectionSetupPayload(frame); assertFalse(setupPayload.willClientHonorLease()); - assertFalse(setupPayload.hasMetadata()); + assertTrue(setupPayload.hasMetadata()); assertNotNull(setupPayload.metadata()); assertEquals(0, setupPayload.metadata().readableBytes()); assertEquals(payload.data(), setupPayload.data()); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 36abb14b4..da2755cc3 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -20,6 +20,7 @@ import static io.rsocket.frame.FrameHeaderFlyweight.frameType; import static io.rsocket.frame.FrameType.CANCEL; import static io.rsocket.frame.FrameType.REQUEST_CHANNEL; +import static io.rsocket.frame.FrameType.REQUEST_FNF; import static io.rsocket.frame.FrameType.REQUEST_RESPONSE; import static io.rsocket.frame.FrameType.REQUEST_STREAM; import static org.hamcrest.MatcherAssert.assertThat; @@ -35,6 +36,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCounted; import io.rsocket.Payload; @@ -74,7 +76,9 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.runners.model.Statement; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -625,6 +629,103 @@ public void simpleOnDiscardRequestChannelTest2() { rule.assertHasNoLeaks(); } + @ParameterizedTest + @MethodSource("encodeDecodePayloadCases") + public void verifiesThatFrameWithNoMetadataHasDecodedCorrectlyIntoPayload( + FrameType frameType, int framesCnt, int responsesCnt) { + ByteBufAllocator allocator = rule.alloc(); + AssertSubscriber assertSubscriber = AssertSubscriber.create(responsesCnt); + TestPublisher testPublisher = TestPublisher.create(); + + Publisher response; + + switch (frameType) { + case REQUEST_FNF: + response = + testPublisher.mono().flatMap(p -> rule.socket.fireAndForget(p).then(Mono.empty())); + break; + case REQUEST_RESPONSE: + response = testPublisher.mono().flatMap(p -> rule.socket.requestResponse(p)); + break; + case REQUEST_STREAM: + response = testPublisher.mono().flatMapMany(p -> rule.socket.requestStream(p)); + break; + case REQUEST_CHANNEL: + response = rule.socket.requestChannel(testPublisher.flux()); + break; + default: + throw new UnsupportedOperationException("illegal case"); + } + + response.subscribe(assertSubscriber); + testPublisher.next(ByteBufPayload.create("d")); + + int streamId = rule.getStreamIdForRequestType(frameType); + + if (responsesCnt > 0) { + for (int i = 0; i < responsesCnt - 1; i++) { + rule.connection.addToReceivedBuffer( + PayloadFrameFlyweight.encode( + allocator, + streamId, + false, + false, + true, + null, + Unpooled.wrappedBuffer(("rd" + (i + 1)).getBytes()))); + } + + rule.connection.addToReceivedBuffer( + PayloadFrameFlyweight.encode( + allocator, + streamId, + false, + true, + true, + null, + Unpooled.wrappedBuffer(("rd" + responsesCnt).getBytes()))); + } + + if (framesCnt > 1) { + rule.connection.addToReceivedBuffer( + RequestNFrameFlyweight.encode(allocator, streamId, framesCnt)); + } + + for (int i = 1; i < framesCnt; i++) { + testPublisher.next(ByteBufPayload.create("d" + i)); + } + + Assertions.assertThat(rule.connection.getSent()) + .describedAs( + "Interaction Type :[%s]. Expected to observe %s frames sent", frameType, framesCnt) + .hasSize(framesCnt) + .allMatch(bb -> !FrameHeaderFlyweight.hasMetadata(bb)) + .allMatch(ByteBuf::release); + + Assertions.assertThat(assertSubscriber.isTerminated()) + .describedAs("Interaction Type :[%s]. Expected to be terminated", frameType) + .isTrue(); + + Assertions.assertThat(assertSubscriber.values()) + .describedAs( + "Interaction Type :[%s]. Expected to observe %s frames received", + frameType, responsesCnt) + .hasSize(responsesCnt) + .allMatch(p -> !p.hasMetadata()) + .allMatch(p -> p.release()); + + rule.assertHasNoLeaks(); + rule.connection.clearSendReceiveBuffers(); + } + + static Stream encodeDecodePayloadCases() { + return Stream.of( + Arguments.of(REQUEST_FNF, 1, 0), + Arguments.of(REQUEST_RESPONSE, 1, 1), + Arguments.of(REQUEST_STREAM, 1, 5), + Arguments.of(REQUEST_CHANNEL, 5, 5)); + } + public int sendRequestResponse(Publisher response) { Subscriber sub = TestSubscriber.create(); response.subscribe(sub); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java index 05f9fd46e..726be0770 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java @@ -19,6 +19,8 @@ import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE; import static io.rsocket.frame.FrameHeaderFlyweight.frameType; import static io.rsocket.frame.FrameType.REQUEST_CHANNEL; +import static io.rsocket.frame.FrameType.REQUEST_FNF; +import static io.rsocket.frame.FrameType.REQUEST_N; import static io.rsocket.frame.FrameType.REQUEST_RESPONSE; import static io.rsocket.frame.FrameType.REQUEST_STREAM; import static org.hamcrest.MatcherAssert.assertThat; @@ -45,6 +47,7 @@ import io.rsocket.frame.KeepAliveFrameFlyweight; import io.rsocket.frame.PayloadFrameFlyweight; import io.rsocket.frame.RequestChannelFrameFlyweight; +import io.rsocket.frame.RequestFireAndForgetFrameFlyweight; import io.rsocket.frame.RequestNFrameFlyweight; import io.rsocket.frame.RequestResponseFrameFlyweight; import io.rsocket.frame.RequestStreamFrameFlyweight; @@ -55,16 +58,21 @@ import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.ByteBufPayload; import io.rsocket.util.DefaultPayload; +import io.rsocket.util.EmptyPayload; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.runners.model.Statement; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -78,6 +86,7 @@ import reactor.core.publisher.Operators; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.test.publisher.TestPublisher; import reactor.test.util.RaceTestUtils; public class RSocketResponderTest { @@ -605,6 +614,108 @@ public Flux requestChannel(Publisher payloads) { rule.assertHasNoLeaks(); } + @ParameterizedTest + @MethodSource("encodeDecodePayloadCases") + public void verifiesThatFrameWithNoMetadataHasDecodedCorrectlyIntoPayload( + FrameType frameType, int framesCnt, int responsesCnt) { + ByteBufAllocator allocator = rule.alloc(); + AssertSubscriber assertSubscriber = AssertSubscriber.create(framesCnt); + TestPublisher testPublisher = TestPublisher.create(); + + rule.setAcceptingSocket( + new AbstractRSocket() { + @Override + public Mono fireAndForget(Payload payload) { + Mono.just(payload).subscribe(assertSubscriber); + return Mono.empty(); + } + + @Override + public Mono requestResponse(Payload payload) { + Mono.just(payload).subscribe(assertSubscriber); + return testPublisher.mono(); + } + + @Override + public Flux requestStream(Payload payload) { + Mono.just(payload).subscribe(assertSubscriber); + return testPublisher.flux(); + } + + @Override + public Flux requestChannel(Publisher payloads) { + payloads.subscribe(assertSubscriber); + return testPublisher.flux(); + } + }, + 1); + + rule.sendRequest(1, frameType, ByteBufPayload.create("d")); + + // if responses number is bigger than 1 we have to send one extra requestN + if (responsesCnt > 1) { + rule.connection.addToReceivedBuffer( + RequestNFrameFlyweight.encode(allocator, 1, responsesCnt - 1)); + } + + // respond with specific number of elements + for (int i = 0; i < responsesCnt; i++) { + testPublisher.next(ByteBufPayload.create("rd" + i)); + } + + // Listen to incoming frames. Valid for RequestChannel case only + if (framesCnt > 1) { + for (int i = 1; i < responsesCnt; i++) { + rule.connection.addToReceivedBuffer( + PayloadFrameFlyweight.encode( + allocator, + 1, + false, + false, + true, + null, + Unpooled.wrappedBuffer(("d" + (i + 1)).getBytes()))); + } + } + + if (responsesCnt > 0) { + Assertions.assertThat( + rule.connection.getSent().stream().filter(bb -> frameType(bb) != REQUEST_N)) + .describedAs( + "Interaction Type :[%s]. Expected to observe %s frames sent", frameType, responsesCnt) + .hasSize(responsesCnt) + .allMatch(bb -> !FrameHeaderFlyweight.hasMetadata(bb)); + } + + if (framesCnt > 1) { + Assertions.assertThat( + rule.connection.getSent().stream().filter(bb -> frameType(bb) == REQUEST_N)) + .describedAs( + "Interaction Type :[%s]. Expected to observe single RequestN(%s) frame", + frameType, framesCnt - 1) + .hasSize(1) + .first() + .matches(bb -> RequestNFrameFlyweight.requestN(bb) == (framesCnt - 1)); + } + + Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release); + + Assertions.assertThat(assertSubscriber.awaitAndAssertNextValueCount(framesCnt).values()) + .hasSize(framesCnt) + .allMatch(p -> !p.hasMetadata()) + .allMatch(ReferenceCounted::release); + + rule.assertHasNoLeaks(); + } + + static Stream encodeDecodePayloadCases() { + return Stream.of( + Arguments.of(REQUEST_FNF, 1, 0), + Arguments.of(REQUEST_RESPONSE, 1, 1), + Arguments.of(REQUEST_STREAM, 1, 5), + Arguments.of(REQUEST_CHANNEL, 5, 5)); + } + public static class ServerSocketRule extends AbstractSocketRule { private RSocket acceptingSocket; @@ -653,34 +764,25 @@ protected RSocketResponder newRSocket(LeaksTrackingByteBufAllocator allocator) { } private void sendRequest(int streamId, FrameType frameType) { + sendRequest(streamId, frameType, EmptyPayload.INSTANCE); + } + + private void sendRequest(int streamId, FrameType frameType, Payload payload) { ByteBuf request; switch (frameType) { case REQUEST_CHANNEL: request = - RequestChannelFrameFlyweight.encode( - allocator, - streamId, - false, - false, - prefetch, - Unpooled.EMPTY_BUFFER, - Unpooled.EMPTY_BUFFER); + RequestChannelFrameFlyweight.encode(allocator, streamId, false, prefetch, payload); break; case REQUEST_STREAM: - request = - RequestStreamFrameFlyweight.encode( - allocator, - streamId, - false, - prefetch, - Unpooled.EMPTY_BUFFER, - Unpooled.EMPTY_BUFFER); + request = RequestStreamFrameFlyweight.encode(allocator, streamId, prefetch, payload); break; case REQUEST_RESPONSE: - request = - RequestResponseFrameFlyweight.encode( - allocator, streamId, false, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER); + request = RequestResponseFrameFlyweight.encode(allocator, streamId, payload); + break; + case REQUEST_FNF: + request = RequestFireAndForgetFrameFlyweight.encode(allocator, streamId, payload); break; default: throw new IllegalArgumentException("unsupported type: " + frameType); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java index f5a013357..c6b1735e6 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java @@ -20,7 +20,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.rsocket.frame.*; -import io.rsocket.util.DefaultPayload; import java.util.concurrent.ThreadLocalRandom; import org.junit.Assert; import org.junit.jupiter.api.DisplayName; @@ -43,14 +42,17 @@ final class FrameFragmenterTest { @Test void testGettingData() { ByteBuf rr = - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); ByteBuf fnf = - RequestFireAndForgetFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestFireAndForgetFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); ByteBuf rs = - RequestStreamFrameFlyweight.encode(allocator, 1, true, 1, DefaultPayload.create(data)); + RequestStreamFrameFlyweight.encode( + allocator, 1, true, 1, null, Unpooled.wrappedBuffer(data)); ByteBuf rc = RequestChannelFrameFlyweight.encode( - allocator, 1, true, false, 1, DefaultPayload.create(data)); + allocator, 1, true, false, 1, null, Unpooled.wrappedBuffer(data)); ByteBuf data = FrameFragmenter.getData(rr, FrameType.REQUEST_RESPONSE); Assert.assertEquals(data, Unpooled.wrappedBuffer(data)); @@ -73,16 +75,22 @@ void testGettingData() { void testGettingMetadata() { ByteBuf rr = RequestResponseFrameFlyweight.encode( - allocator, 1, true, DefaultPayload.create(data, metadata)); + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); ByteBuf fnf = RequestFireAndForgetFrameFlyweight.encode( - allocator, 1, true, DefaultPayload.create(data, metadata)); + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); ByteBuf rs = RequestStreamFrameFlyweight.encode( - allocator, 1, true, 1, DefaultPayload.create(data, metadata)); + allocator, 1, true, 1, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); ByteBuf rc = RequestChannelFrameFlyweight.encode( - allocator, 1, true, false, 1, DefaultPayload.create(data, metadata)); + allocator, + 1, + true, + false, + 1, + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data)); ByteBuf data = FrameFragmenter.getMetadata(rr, FrameType.REQUEST_RESPONSE); Assert.assertEquals(data, Unpooled.wrappedBuffer(metadata)); @@ -104,7 +112,8 @@ void testGettingMetadata() { @Test void returnEmptBufferWhenNoMetadataPresent() { ByteBuf rr = - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); ByteBuf data = FrameFragmenter.getMetadata(rr, FrameType.REQUEST_RESPONSE); Assert.assertEquals(data, Unpooled.EMPTY_BUFFER); @@ -115,7 +124,8 @@ void returnEmptBufferWhenNoMetadataPresent() { @Test void encodeFirstFrameWithData() { ByteBuf rr = - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -144,7 +154,7 @@ void encodeFirstFrameWithData() { void encodeFirstWithDataChannel() { ByteBuf rc = RequestChannelFrameFlyweight.encode( - allocator, 1, true, false, 10, DefaultPayload.create(data)); + allocator, 1, true, false, 10, null, Unpooled.wrappedBuffer(data)); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -173,7 +183,8 @@ void encodeFirstWithDataChannel() { @Test void encodeFirstWithDataStream() { ByteBuf rc = - RequestStreamFrameFlyweight.encode(allocator, 1, true, 50, DefaultPayload.create(data)); + RequestStreamFrameFlyweight.encode( + allocator, 1, true, 50, null, Unpooled.wrappedBuffer(data)); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -203,10 +214,7 @@ void encodeFirstWithDataStream() { void encodeFirstFrameWithMetadata() { ByteBuf rr = RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))); + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -234,7 +242,7 @@ void encodeFirstFrameWithMetadata() { void encodeFirstWithDataAndMetadataStream() { ByteBuf rc = RequestStreamFrameFlyweight.encode( - allocator, 1, true, 50, DefaultPayload.create(data, metadata)); + allocator, 1, true, 50, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); ByteBuf fragment = FrameFragmenter.encodeFirstFragment( @@ -266,7 +274,8 @@ void encodeFirstWithDataAndMetadataStream() { @Test void fragmentData() { ByteBuf rr = - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)); + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)); Publisher fragments = FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_RESPONSE, false); @@ -293,11 +302,7 @@ void fragmentData() { void fragmentMetadata() { ByteBuf rr = RequestStreamFrameFlyweight.encode( - allocator, - 1, - true, - 10, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))); + allocator, 1, true, 10, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER); Publisher fragments = FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_STREAM, false); @@ -324,7 +329,7 @@ void fragmentMetadata() { void fragmentDataAndMetadata() { ByteBuf rr = RequestResponseFrameFlyweight.encode( - allocator, 1, true, DefaultPayload.create(data, metadata)); + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.wrappedBuffer(data)); Publisher fragments = FrameFragmenter.fragmentFrame(allocator, 1024, rr, FrameType.REQUEST_RESPONSE, false); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java index 6e0d0dc1b..13632165b 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java @@ -22,7 +22,6 @@ import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; import io.rsocket.frame.*; -import io.rsocket.util.DefaultPayload; import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -48,15 +47,16 @@ final class FrameReassemblerTest { void reassembleData() { List byteBufs = Arrays.asList( - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)), + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, false, false, true, DefaultPayload.create(data))); + allocator, 1, false, false, true, null, Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -88,7 +88,8 @@ void reassembleData() { void passthrough() { List byteBufs = Arrays.asList( - RequestResponseFrameFlyweight.encode(allocator, 1, false, DefaultPayload.create(data))); + RequestResponseFrameFlyweight.encode( + allocator, 1, false, null, Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -115,38 +116,39 @@ void reassembleMetadata() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, false, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER)); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -184,35 +186,40 @@ void reassembleMetadataChannel() { true, false, 100, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, false, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER)); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -249,39 +256,39 @@ void reassembleMetadataStream() { List byteBufs = Arrays.asList( RequestStreamFrameFlyweight.encode( - allocator, - 1, - true, - 250, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, 250, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, false, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER)); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -319,34 +326,33 @@ void reassembleMetadataAndData() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create( - Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, false, false, true, DefaultPayload.create(data))); + allocator, 1, false, false, true, null, Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); @@ -387,32 +393,31 @@ public void cancelBeforeAssembling() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create( - Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); Flux.fromIterable(byteBufs).handle(reassembler::reassembleFrame).blockLast(); @@ -436,32 +441,31 @@ public void dispose() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create( - Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data))); FrameReassembler reassembler = new FrameReassembler(allocator); Flux.fromIterable(byteBufs).handle(reassembler::reassembleFrame).blockLast(); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java index b21a7c9da..c5abce339 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java @@ -30,7 +30,6 @@ import io.rsocket.frame.FrameType; import io.rsocket.frame.PayloadFrameFlyweight; import io.rsocket.frame.RequestResponseFrameFlyweight; -import io.rsocket.util.DefaultPayload; import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -59,15 +58,16 @@ final class ReassembleDuplexConnectionTest { void reassembleData() { List byteBufs = Arrays.asList( - RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)), + RequestResponseFrameFlyweight.encode( + allocator, 1, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, true, false, true, DefaultPayload.create(data)), + allocator, 1, true, false, true, null, Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, false, false, true, DefaultPayload.create(data))); + allocator, 1, false, false, true, null, Unpooled.wrappedBuffer(data))); CompositeByteBuf data = allocator @@ -99,38 +99,39 @@ void reassembleMetadata() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, false, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata)))); + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER)); CompositeByteBuf metadata = allocator @@ -164,34 +165,33 @@ void reassembleMetadataAndData() { List byteBufs = Arrays.asList( RequestResponseFrameFlyweight.encode( - allocator, - 1, - true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.EMPTY_BUFFER), PayloadFrameFlyweight.encode( allocator, 1, true, false, true, - DefaultPayload.create( - Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata))), + Unpooled.wrappedBuffer(metadata), + Unpooled.wrappedBuffer(data)), PayloadFrameFlyweight.encode( - allocator, 1, false, false, true, DefaultPayload.create(data))); + allocator, 1, false, false, true, null, Unpooled.wrappedBuffer(data))); CompositeByteBuf data = allocator @@ -230,7 +230,7 @@ void reassembleMetadataAndData() { void reassembleNonFragment() { ByteBuf encode = RequestResponseFrameFlyweight.encode( - allocator, 1, false, DefaultPayload.create(Unpooled.wrappedBuffer(data))); + allocator, 1, false, null, Unpooled.wrappedBuffer(data)); when(delegate.receive()).thenReturn(Flux.just(encode)); when(delegate.onClose()).thenReturn(Mono.never()); diff --git a/rsocket-core/src/test/java/io/rsocket/frame/DataAndMetadataFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/DataAndMetadataFlyweightTest.java deleted file mode 100644 index 6f9113d73..000000000 --- a/rsocket-core/src/test/java/io/rsocket/frame/DataAndMetadataFlyweightTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package io.rsocket.frame; - -import io.netty.buffer.*; -import org.junit.jupiter.api.Test; - -class DataAndMetadataFlyweightTest { - @Test - void testEncodeData() { - ByteBuf header = FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 1, FrameType.PAYLOAD, 0); - ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm data_"); - ByteBuf frame = DataAndMetadataFlyweight.encodeOnlyData(ByteBufAllocator.DEFAULT, header, data); - ByteBuf d = DataAndMetadataFlyweight.data(frame, false); - String s = ByteBufUtil.prettyHexDump(d); - System.out.println(s); - } - - @Test - void testEncodeMetadata() { - ByteBuf header = FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 1, FrameType.PAYLOAD, 0); - ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm metadata_"); - ByteBuf frame = - DataAndMetadataFlyweight.encodeOnlyMetadata(ByteBufAllocator.DEFAULT, header, data); - ByteBuf d = DataAndMetadataFlyweight.data(frame, false); - String s = ByteBufUtil.prettyHexDump(d); - System.out.println(s); - } - - @Test - void testEncodeDataAndMetadata() { - ByteBuf header = - FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 1, FrameType.REQUEST_RESPONSE, 0); - ByteBuf data = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm data_"); - ByteBuf metadata = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm metadata_"); - ByteBuf frame = - DataAndMetadataFlyweight.encode(ByteBufAllocator.DEFAULT, header, metadata, data); - ByteBuf m = DataAndMetadataFlyweight.metadata(frame, true); - String s = ByteBufUtil.prettyHexDump(m); - System.out.println(s); - FrameType frameType = FrameHeaderFlyweight.frameType(frame); - System.out.println(frameType); - - for (int i = 0; i < 10_000_000; i++) { - ByteBuf d1 = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm data_"); - ByteBuf m1 = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "_I'm metadata_"); - ByteBuf h1 = - FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 1, FrameType.REQUEST_RESPONSE, 0); - ByteBuf f1 = DataAndMetadataFlyweight.encode(ByteBufAllocator.DEFAULT, h1, m1, d1); - f1.release(); - } - } -} diff --git a/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java index e337d4332..eea72c03e 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java @@ -35,7 +35,7 @@ void extensionData() { Assertions.assertFalse(FrameHeaderFlyweight.hasMetadata(extension)); Assertions.assertEquals(extendedType, ExtensionFrameFlyweight.extendedType(extension)); - Assertions.assertEquals(0, ExtensionFrameFlyweight.metadata(extension).readableBytes()); + Assertions.assertNull(ExtensionFrameFlyweight.metadata(extension)); Assertions.assertEquals(data, ExtensionFrameFlyweight.data(extension)); extension.release(); } diff --git a/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java index 9ef89326a..e78adf9f1 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java @@ -31,7 +31,7 @@ void nextCompleteData() { String data = PayloadFrameFlyweight.data(nextComplete).toString(StandardCharsets.UTF_8); ByteBuf metadata = PayloadFrameFlyweight.metadata(nextComplete); Assertions.assertEquals("d", data); - Assertions.assertTrue(metadata.readableBytes() == 0); + Assertions.assertNull(metadata); nextComplete.release(); } @@ -68,7 +68,18 @@ void nextData() { String data = PayloadFrameFlyweight.data(next).toString(StandardCharsets.UTF_8); ByteBuf metadata = PayloadFrameFlyweight.metadata(next); Assertions.assertEquals("d", data); - Assertions.assertTrue(metadata.readableBytes() == 0); + Assertions.assertNull(metadata); + next.release(); + } + + @Test + void nextDataEmptyMetadata() { + Payload payload = DefaultPayload.create("d".getBytes(), new byte[0]); + ByteBuf next = PayloadFrameFlyweight.encodeNext(ByteBufAllocator.DEFAULT, 1, payload); + String data = PayloadFrameFlyweight.data(next).toString(StandardCharsets.UTF_8); + ByteBuf metadata = PayloadFrameFlyweight.metadata(next); + Assertions.assertEquals("d", data); + Assertions.assertEquals(metadata.readableBytes(), 0); next.release(); } } diff --git a/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java index a82c3e658..c19d4e1f4 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java @@ -22,13 +22,15 @@ void testEncoding() { Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - // Encoded Frame Length⌍ ⌌ Encoded Headers - // | | ⌌ Encoded Request(1) - // | | | ⌌Encoded Metadata - // | | | | ⌌Encoded Data - // __|________|_________|_________|_____| - // ↓ ↓↓ ↓↓ ↓↓ ↓↓↓ - assertEquals("000010000000011900000000010000026d6464", ByteBufUtil.hexDump(frame)); + // Encoded FrameLength⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Metadata Length + // | | | | ⌌Encoded Metadata + // | | | | | ⌌Encoded Data + // __|________|_________|______|____|___| + // ↓ ↓↓ ↓↓ ↓↓ ↓↓ ↓↓↓ + String expected = "000010000000011900000000010000026d6464"; + assertEquals(expected, ByteBufUtil.hexDump(frame)); frame.release(); } @@ -44,12 +46,14 @@ void testEncodingWithEmptyMetadata() { Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - // Encoded Frame Length⌍ ⌌ Encoded Headers - // | | ⌌ Encoded Request(1) - // | | | ⌌Encoded Data - // __|________|_________|_____| - // ↓ ↓↓ ↓↓ ↓↓↓ - assertEquals("00000b0000000118000000000164", ByteBufUtil.hexDump(frame)); + // Encoded FrameLength⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Metadata Length (0) + // | | | | ⌌Encoded Data + // __|________|_________|_______|___| + // ↓ ↓↓ ↓↓ ↓↓ ↓↓↓ + String expected = "00000e0000000119000000000100000064"; + assertEquals(expected, ByteBufUtil.hexDump(frame)); frame.release(); } @@ -66,12 +70,13 @@ void testEncodingWithNullMetadata() { frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - // Encoded Frame Length⌍ ⌌ Encoded Headers - // | | ⌌ Encoded Request(1) - // | | | ⌌Encoded Data - // __|________|_________|_____| - // ↓ ↓↓ ↓↓ ↓↓↓ - assertEquals("00000b0000000118000000000164", ByteBufUtil.hexDump(frame)); + // Encoded FrameLength⌍ ⌌ Encoded Headers + // | | ⌌ Encoded Request(1) + // | | | ⌌Encoded Data + // __|________|_________|_____| + // ↓<-> ↓↓ <-> ↓↓ <-> ↓↓↓ + String expected = "00000b0000000118000000000164"; + assertEquals(expected, ByteBufUtil.hexDump(frame)); frame.release(); } @@ -110,7 +115,7 @@ void requestResponseData() { assertFalse(FrameHeaderFlyweight.hasMetadata(request)); assertEquals("d", data); - assertTrue(metadata.readableBytes() == 0); + assertNull(metadata); request.release(); } @@ -145,13 +150,13 @@ void requestStreamDataMetadata() { Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); - int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); + long actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); String data = RequestStreamFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); String metadata = RequestStreamFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); - assertEquals(Integer.MAX_VALUE, actualRequest); + assertEquals(Long.MAX_VALUE, actualRequest); assertEquals("md", metadata); assertEquals("d", data); request.release(); @@ -168,13 +173,13 @@ void requestStreamData() { null, Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); - int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); + long actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); String data = RequestStreamFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); ByteBuf metadata = RequestStreamFrameFlyweight.metadata(request); assertFalse(FrameHeaderFlyweight.hasMetadata(request)); - assertEquals(42, actualRequest); - assertTrue(metadata.readableBytes() == 0); + assertEquals(42L, actualRequest); + assertNull(metadata); assertEquals("d", data); request.release(); } @@ -190,13 +195,13 @@ void requestStreamMetadata() { Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), Unpooled.EMPTY_BUFFER); - int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); + long actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); ByteBuf data = RequestStreamFrameFlyweight.data(request); String metadata = RequestStreamFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); - assertEquals(42, actualRequest); + assertEquals(42L, actualRequest); assertTrue(data.readableBytes() == 0); assertEquals("md", metadata); request.release(); @@ -237,7 +242,7 @@ void requestFnfData() { assertFalse(FrameHeaderFlyweight.hasMetadata(request)); assertEquals("d", data); - assertTrue(metadata.readableBytes() == 0); + assertNull(metadata); request.release(); } diff --git a/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java index 09aa8f325..2ad944d09 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/ByteBufPayloadTest.java @@ -25,11 +25,11 @@ public void shouldIndicateThatItHasNotMetadata() { } @Test - public void shouldIndicateThatItHasNotMetadata1() { + public void shouldIndicateThatItHasMetadata1() { Payload payload = ByteBufPayload.create(Unpooled.wrappedBuffer("data".getBytes()), Unpooled.EMPTY_BUFFER); - Assertions.assertThat(payload.hasMetadata()).isFalse(); + Assertions.assertThat(payload.hasMetadata()).isTrue(); Assertions.assertThat(payload.release()).isTrue(); } diff --git a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java index 7ebcd1a74..6bae0886b 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java @@ -60,18 +60,18 @@ public void shouldIndicateThatItHasNotMetadata() { } @Test - public void shouldIndicateThatItHasNotMetadata1() { + public void shouldIndicateThatItHasMetadata1() { Payload payload = DefaultPayload.create(Unpooled.wrappedBuffer("data".getBytes()), Unpooled.EMPTY_BUFFER); - Assertions.assertThat(payload.hasMetadata()).isFalse(); + Assertions.assertThat(payload.hasMetadata()).isTrue(); } @Test - public void shouldIndicateThatItHasNotMetadata2() { + public void shouldIndicateThatItHasMetadata2() { Payload payload = DefaultPayload.create(ByteBuffer.wrap("data".getBytes()), ByteBuffer.allocate(0)); - Assertions.assertThat(payload.hasMetadata()).isFalse(); + Assertions.assertThat(payload.hasMetadata()).isTrue(); } } From 5dbacac2e44c5feb4afcb508f9e5172be7a974f5 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 23 Apr 2020 16:52:02 +0300 Subject: [PATCH 5/8] fixes failing tests related to changes of initialRequestN Signed-off-by: Oleh Dokuka --- .../src/test/java/io/rsocket/core/RSocketRequesterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index da2755cc3..533972b76 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -145,7 +145,7 @@ protected void hookOnSubscribe(Subscription subscription) { ByteBuf f = sent.get(0); assertThat("initial frame", frameType(f), is(REQUEST_STREAM)); - assertThat("initial request n", RequestStreamFrameFlyweight.initialRequestN(f), is(5)); + assertThat("initial request n", RequestStreamFrameFlyweight.initialRequestN(f), is(5L)); assertThat("should be released", f.release(), is(true)); rule.assertHasNoLeaks(); } @@ -332,7 +332,7 @@ protected void hookOnSubscribe(Subscription subscription) {} Assertions.assertThat(FrameHeaderFlyweight.frameType(initialFrame)).isEqualTo(REQUEST_CHANNEL); Assertions.assertThat(RequestChannelFrameFlyweight.initialRequestN(initialFrame)) - .isEqualTo(Integer.MAX_VALUE); + .isEqualTo(Long.MAX_VALUE); Assertions.assertThat( RequestChannelFrameFlyweight.data(initialFrame).toString(CharsetUtil.UTF_8)) .isEqualTo("0"); From 5c7bdea9bb584a787443977e31776fb50095bf56 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 23 Apr 2020 17:04:49 +0300 Subject: [PATCH 6/8] fixes compilation errors Signed-off-by: Oleh Dokuka --- rsocket-test/src/main/java/io/rsocket/test/TestFrames.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java b/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java index 2651b14ec..9d78b2f3f 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java @@ -87,12 +87,12 @@ public static ByteBuf createTestRequestNFrame() { /** @return {@link ByteBuf} representing test instance of Request-Response frame */ public static ByteBuf createTestRequestResponseFrame() { - return RequestResponseFrameFlyweight.encode(allocator, 1, false, emptyPayload); + return RequestResponseFrameFlyweight.encode(allocator, 1, emptyPayload); } /** @return {@link ByteBuf} representing test instance of Request-Stream frame */ public static ByteBuf createTestRequestStreamFrame() { - return RequestStreamFrameFlyweight.encode(allocator, 1, false, 1L, emptyPayload); + return RequestStreamFrameFlyweight.encode(allocator, 1, 1L, emptyPayload); } /** @return {@link ByteBuf} representing test instance of Setup frame */ From 9d110facdf904926264e1355659a08a73b7bbad7 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 23 Apr 2020 17:15:19 +0300 Subject: [PATCH 7/8] uncomment assertions related to proper metadata propagation Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/core/RSocketTest.java | 222 +++++++++--------- 1 file changed, 107 insertions(+), 115 deletions(-) diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index f29f4409c..568f8eed6 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -183,231 +183,223 @@ public Flux requestChannel(Publisher payloads) { @Test public void requestChannelCase_StreamIsTerminatedAfterBothSidesSentCompletion1() { - TestPublisher outerPublisher = TestPublisher.create(); - AssertSubscriber outerAssertSubscriber = new AssertSubscriber<>(0); + TestPublisher requesterPublisher = TestPublisher.create(); + AssertSubscriber requesterSubscriber = new AssertSubscriber<>(0); - AssertSubscriber innerAssertSubscriber = new AssertSubscriber<>(0); - TestPublisher innerPublisher = TestPublisher.create(); + AssertSubscriber responderSubscriber = new AssertSubscriber<>(0); + TestPublisher responderPublisher = TestPublisher.create(); initRequestChannelCase( - outerPublisher, outerAssertSubscriber, innerPublisher, innerAssertSubscriber); + requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber); - nextFromOuterPublisher(outerPublisher, innerAssertSubscriber); + nextFromRequesterPublisher(requesterPublisher, responderSubscriber); - completeFromOuterPublisher(outerPublisher, innerAssertSubscriber); + completeFromRequesterPublisher(requesterPublisher, responderSubscriber); - nextFromInnerPublisher(innerPublisher, outerAssertSubscriber); + nextFromResponderPublisher(responderPublisher, requesterSubscriber); - completeFromInnerPublisher(innerPublisher, outerAssertSubscriber); + completeFromResponderPublisher(responderPublisher, requesterSubscriber); } @Test public void requestChannelCase_StreamIsTerminatedAfterBothSidesSentCompletion2() { - TestPublisher outerPublisher = TestPublisher.create(); - AssertSubscriber outerAssertSubscriber = new AssertSubscriber<>(0); + TestPublisher requesterPublisher = TestPublisher.create(); + AssertSubscriber requesterSubscriber = new AssertSubscriber<>(0); - AssertSubscriber innerAssertSubscriber = new AssertSubscriber<>(0); - TestPublisher innerPublisher = TestPublisher.create(); + AssertSubscriber responderSubscriber = new AssertSubscriber<>(0); + TestPublisher responderPublisher = TestPublisher.create(); initRequestChannelCase( - outerPublisher, outerAssertSubscriber, innerPublisher, innerAssertSubscriber); + requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber); - nextFromInnerPublisher(innerPublisher, outerAssertSubscriber); + nextFromResponderPublisher(responderPublisher, requesterSubscriber); - completeFromInnerPublisher(innerPublisher, outerAssertSubscriber); + completeFromResponderPublisher(responderPublisher, requesterSubscriber); - nextFromOuterPublisher(outerPublisher, innerAssertSubscriber); + nextFromRequesterPublisher(requesterPublisher, responderSubscriber); - completeFromOuterPublisher(outerPublisher, innerAssertSubscriber); + completeFromRequesterPublisher(requesterPublisher, responderSubscriber); } @Test public void requestChannelCase_CancellationFromResponderShouldLeaveStreamInHalfClosedStateWithNextCompletionPossibleFromRequester() { - TestPublisher outerPublisher = TestPublisher.create(); - AssertSubscriber outerAssertSubscriber = new AssertSubscriber<>(0); + TestPublisher requesterPublisher = TestPublisher.create(); + AssertSubscriber requesterSubscriber = new AssertSubscriber<>(0); - AssertSubscriber innerAssertSubscriber = new AssertSubscriber<>(0); - TestPublisher innerPublisher = TestPublisher.create(); + AssertSubscriber responderSubscriber = new AssertSubscriber<>(0); + TestPublisher responderPublisher = TestPublisher.create(); initRequestChannelCase( - outerPublisher, outerAssertSubscriber, innerPublisher, innerAssertSubscriber); + requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber); - nextFromOuterPublisher(outerPublisher, innerAssertSubscriber); + nextFromRequesterPublisher(requesterPublisher, responderSubscriber); - cancelFromInnerSubscriber(outerPublisher, innerAssertSubscriber); + cancelFromResponderSubscriber(requesterPublisher, responderSubscriber); - nextFromInnerPublisher(innerPublisher, outerAssertSubscriber); + nextFromResponderPublisher(responderPublisher, requesterSubscriber); - completeFromInnerPublisher(innerPublisher, outerAssertSubscriber); + completeFromResponderPublisher(responderPublisher, requesterSubscriber); } @Test public void requestChannelCase_CompletionFromRequesterShouldLeaveStreamInHalfClosedStateWithNextCancellationPossibleFromResponder() { - TestPublisher outerPublisher = TestPublisher.create(); - AssertSubscriber outerAssertSubscriber = new AssertSubscriber<>(0); + TestPublisher requesterPublisher = TestPublisher.create(); + AssertSubscriber requesterSubscriber = new AssertSubscriber<>(0); - AssertSubscriber innerAssertSubscriber = new AssertSubscriber<>(0); - TestPublisher innerPublisher = TestPublisher.create(); + AssertSubscriber responderSubscriber = new AssertSubscriber<>(0); + TestPublisher responderPublisher = TestPublisher.create(); initRequestChannelCase( - outerPublisher, outerAssertSubscriber, innerPublisher, innerAssertSubscriber); + requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber); - nextFromInnerPublisher(innerPublisher, outerAssertSubscriber); + nextFromResponderPublisher(responderPublisher, requesterSubscriber); - completeFromInnerPublisher(innerPublisher, outerAssertSubscriber); + completeFromResponderPublisher(responderPublisher, requesterSubscriber); - nextFromOuterPublisher(outerPublisher, innerAssertSubscriber); + nextFromRequesterPublisher(requesterPublisher, responderSubscriber); - cancelFromInnerSubscriber(outerPublisher, innerAssertSubscriber); + cancelFromResponderSubscriber(requesterPublisher, responderSubscriber); } @Test public void requestChannelCase_ensureThatRequesterSubscriberCancellationTerminatesStreamsOnBothSides() { - TestPublisher outerPublisher = TestPublisher.create(); - AssertSubscriber outerAssertSubscriber = new AssertSubscriber<>(0); + TestPublisher requesterPublisher = TestPublisher.create(); + AssertSubscriber requesterSubscriber = new AssertSubscriber<>(0); - AssertSubscriber innerAssertSubscriber = new AssertSubscriber<>(0); - TestPublisher innerPublisher = TestPublisher.create(); + AssertSubscriber responderSubscriber = new AssertSubscriber<>(0); + TestPublisher responderPublisher = TestPublisher.create(); initRequestChannelCase( - outerPublisher, outerAssertSubscriber, innerPublisher, innerAssertSubscriber); + requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber); - nextFromInnerPublisher(innerPublisher, outerAssertSubscriber); + nextFromResponderPublisher(responderPublisher, requesterSubscriber); - nextFromOuterPublisher(outerPublisher, innerAssertSubscriber); + nextFromRequesterPublisher(requesterPublisher, responderSubscriber); // ensures both sides are terminated - cancelFromOuterSubscriber( - outerPublisher, outerAssertSubscriber, innerPublisher, innerAssertSubscriber); + cancelFromRequesterSubscriber( + requesterPublisher, requesterSubscriber, responderPublisher, responderSubscriber); } void initRequestChannelCase( - TestPublisher outerPublisher, - AssertSubscriber outerAssertSubscriber, - TestPublisher innerPublisher, - AssertSubscriber innerAssertSubscriber) { + TestPublisher requesterPublisher, + AssertSubscriber requesterSubscriber, + TestPublisher responderPublisher, + AssertSubscriber responderSubscriber) { rule.setRequestAcceptor( new AbstractRSocket() { @Override public Flux requestChannel(Publisher payloads) { - payloads.subscribe(innerAssertSubscriber); - return innerPublisher.flux(); + payloads.subscribe(responderSubscriber); + return responderPublisher.flux(); } }); - rule.crs.requestChannel(outerPublisher).subscribe(outerAssertSubscriber); + rule.crs.requestChannel(requesterPublisher).subscribe(requesterSubscriber); - outerPublisher.assertWasSubscribed(); - outerAssertSubscriber.assertSubscribed(); + requesterPublisher.assertWasSubscribed(); + requesterSubscriber.assertSubscribed(); - innerAssertSubscriber.assertNotSubscribed(); - innerPublisher.assertWasNotSubscribed(); + responderSubscriber.assertNotSubscribed(); + responderPublisher.assertWasNotSubscribed(); // firstRequest - outerAssertSubscriber.request(1); - outerPublisher.assertMaxRequested(1); - outerPublisher.next(DefaultPayload.create("initialData", "initialMetadata")); + requesterSubscriber.request(1); + requesterPublisher.assertMaxRequested(1); + requesterPublisher.next(DefaultPayload.create("initialData", "initialMetadata")); - innerAssertSubscriber.assertSubscribed(); - innerPublisher.assertWasSubscribed(); + responderSubscriber.assertSubscribed(); + responderPublisher.assertWasSubscribed(); } - void nextFromOuterPublisher( - TestPublisher outerPublisher, AssertSubscriber innerAssertSubscriber) { + void nextFromRequesterPublisher( + TestPublisher requesterPublisher, AssertSubscriber responderSubscriber) { // ensures that outerUpstream and innerSubscriber is not terminated so the requestChannel - outerPublisher.assertSubscribers(1); - innerAssertSubscriber.assertNotTerminated(); + requesterPublisher.assertSubscribers(1); + responderSubscriber.assertNotTerminated(); - innerAssertSubscriber.request(6); - outerPublisher.next( + responderSubscriber.request(6); + requesterPublisher.next( DefaultPayload.create("d1", "m1"), DefaultPayload.create("d2"), DefaultPayload.create("d3", "m3"), DefaultPayload.create("d4"), DefaultPayload.create("d5", "m5")); - List innerPayloads = innerAssertSubscriber.awaitAndAssertNextValueCount(6).values(); + List innerPayloads = responderSubscriber.awaitAndAssertNextValueCount(6).values(); Assertions.assertThat(innerPayloads.stream().map(Payload::getDataUtf8)) .containsExactly("initialData", "d1", "d2", "d3", "d4", "d5"); - // fixme: incorrect behaviour of metadata encoding - // Assertions - // .assertThat(innerPayloads - // .stream() - // .map(Payload::hasMetadata) - // ) - // .containsExactly(true, true, false, true, false, true); + Assertions.assertThat(innerPayloads.stream().map(Payload::hasMetadata)) + .containsExactly(true, true, false, true, false, true); Assertions.assertThat(innerPayloads.stream().map(Payload::getMetadataUtf8)) .containsExactly("initialMetadata", "m1", "", "m3", "", "m5"); } - void completeFromOuterPublisher( - TestPublisher outerPublisher, AssertSubscriber innerAssertSubscriber) { + void completeFromRequesterPublisher( + TestPublisher requesterPublisher, AssertSubscriber responderSubscriber) { // ensures that after sending complete upstream part is closed - outerPublisher.complete(); - innerAssertSubscriber.assertTerminated(); - outerPublisher.assertNoSubscribers(); + requesterPublisher.complete(); + responderSubscriber.assertTerminated(); + requesterPublisher.assertNoSubscribers(); } - void cancelFromInnerSubscriber( - TestPublisher outerPublisher, AssertSubscriber innerAssertSubscriber) { + void cancelFromResponderSubscriber( + TestPublisher requesterPublisher, AssertSubscriber responderSubscriber) { // ensures that after sending complete upstream part is closed - innerAssertSubscriber.cancel(); - outerPublisher.assertWasCancelled(); - outerPublisher.assertNoSubscribers(); + responderSubscriber.cancel(); + requesterPublisher.assertWasCancelled(); + requesterPublisher.assertNoSubscribers(); } - void nextFromInnerPublisher( - TestPublisher innerPublisher, AssertSubscriber outerAssertSubscriber) { + void nextFromResponderPublisher( + TestPublisher responderPublisher, AssertSubscriber requesterSubscriber) { // ensures that downstream is not terminated so the requestChannel state is half-closed - innerPublisher.assertSubscribers(1); - outerAssertSubscriber.assertNotTerminated(); + responderPublisher.assertSubscribers(1); + requesterSubscriber.assertNotTerminated(); - // ensures innerPublisher can send messages and outerSubscriber can receive them - outerAssertSubscriber.request(5); - innerPublisher.next( + // ensures responderPublisher can send messages and outerSubscriber can receive them + requesterSubscriber.request(5); + responderPublisher.next( DefaultPayload.create("rd1", "rm1"), DefaultPayload.create("rd2"), DefaultPayload.create("rd3", "rm3"), DefaultPayload.create("rd4"), DefaultPayload.create("rd5", "rm5")); - List outerPayloads = outerAssertSubscriber.awaitAndAssertNextValueCount(5).values(); + List outerPayloads = requesterSubscriber.awaitAndAssertNextValueCount(5).values(); Assertions.assertThat(outerPayloads.stream().map(Payload::getDataUtf8)) .containsExactly("rd1", "rd2", "rd3", "rd4", "rd5"); - // fixme: incorrect behaviour of metadata encoding - // Assertions - // .assertThat(outerPayloads - // .stream() - // .map(Payload::hasMetadata) - // ) - // .containsExactly(true, false, true, false, true); + Assertions.assertThat(outerPayloads.stream().map(Payload::hasMetadata)) + .containsExactly(true, false, true, false, true); Assertions.assertThat(outerPayloads.stream().map(Payload::getMetadataUtf8)) .containsExactly("rm1", "", "rm3", "", "rm5"); } - void completeFromInnerPublisher( - TestPublisher innerPublisher, AssertSubscriber outerAssertSubscriber) { + void completeFromResponderPublisher( + TestPublisher responderPublisher, AssertSubscriber requesterSubscriber) { // ensures that after sending complete inner upstream is closed - innerPublisher.complete(); - outerAssertSubscriber.assertTerminated(); - innerPublisher.assertNoSubscribers(); + responderPublisher.complete(); + requesterSubscriber.assertTerminated(); + responderPublisher.assertNoSubscribers(); } - void cancelFromOuterSubscriber( - TestPublisher outerPublisher, - AssertSubscriber outerAssertSubscriber, - TestPublisher innerPublisher, - AssertSubscriber innerAssertSubscriber) { + void cancelFromRequesterSubscriber( + TestPublisher requesterPublisher, + AssertSubscriber requesterSubscriber, + TestPublisher responderPublisher, + AssertSubscriber responderSubscriber) { // ensures that after sending cancel the whole requestChannel is terminated - outerAssertSubscriber.cancel(); - innerPublisher.assertWasCancelled(); - innerPublisher.assertNoSubscribers(); + requesterSubscriber.cancel(); + // error should be propagated + responderSubscriber.assertTerminated(); + responderPublisher.assertWasCancelled(); + responderPublisher.assertNoSubscribers(); // ensures that cancellation is propagated to the actual upstream - outerPublisher.assertWasCancelled(); - outerPublisher.assertNoSubscribers(); + requesterPublisher.assertWasCancelled(); + requesterPublisher.assertNoSubscribers(); } public static class SocketRule extends ExternalResource { From a821f70755cb8c2ee8484c7667ddf2f0da76f25e Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 24 Apr 2020 10:21:08 +0300 Subject: [PATCH 8/8] moves payload releasing to codecs Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketRequester.java | 23 +++++------ .../io/rsocket/core/RSocketResponder.java | 25 +++-------- .../frame/MetadataPushFrameFlyweight.java | 8 ++++ .../rsocket/frame/PayloadFrameFlyweight.java | 41 ++++++++++--------- .../frame/RequestChannelFrameFlyweight.java | 4 +- .../RequestFireAndForgetFrameFlyweight.java | 5 ++- .../frame/RequestResponseFrameFlyweight.java | 5 ++- .../frame/RequestStreamFrameFlyweight.java | 4 +- .../io/rsocket/core/RSocketRequesterTest.java | 6 ++- .../io/rsocket/core/RSocketResponderTest.java | 14 +++++-- .../FragmentationIntegrationTest.java | 3 +- .../rsocket/frame/PayloadFlyweightTest.java | 18 +++++--- .../main/java/io/rsocket/test/TestFrames.java | 4 +- .../java/io/rsocket/test/TransportTest.java | 10 ++++- 14 files changed, 98 insertions(+), 72 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index c3cbbdfa6..42a6a524d 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -212,8 +212,8 @@ private Mono handleFireAndForget(Payload payload) { return UnicastMonoEmpty.newInstance( () -> { ByteBuf requestFrame = - RequestFireAndForgetFrameFlyweight.encode(allocator, streamId, payload); - payload.release(); + RequestFireAndForgetFrameFlyweight.encodeReleasingPayload( + allocator, streamId, payload); sendProcessor.onNext(requestFrame); }); @@ -240,8 +240,8 @@ private Mono handleRequestResponse(final Payload payload) { @Override public void doOnSubscribe() { final ByteBuf requestFrame = - RequestResponseFrameFlyweight.encode(allocator, streamId, payload); - payload.release(); + RequestResponseFrameFlyweight.encodeReleasingPayload( + allocator, streamId, payload); sendProcessor.onNext(requestFrame); } @@ -294,8 +294,8 @@ public void accept(long n) { firstRequest = false; if (!payloadReleasedFlag.getAndSet(true)) { sendProcessor.onNext( - RequestStreamFrameFlyweight.encode(allocator, streamId, n, payload)); - payload.release(); + RequestStreamFrameFlyweight.encodeReleasingPayload( + allocator, streamId, n, payload)); } } else if (contains(streamId) && !receiver.isDisposed()) { sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n)); @@ -383,10 +383,10 @@ protected void hookOnNext(Payload payload) { receiver.onError(t); return; } - final ByteBuf frame = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload); + final ByteBuf frame = + PayloadFrameFlyweight.encodeNextReleasingPayload(allocator, streamId, payload); sendProcessor.onNext(frame); - payload.release(); } @Override @@ -427,12 +427,10 @@ public void accept(long n) { .subscribe(upstreamSubscriber); if (!payloadReleasedFlag.getAndSet(true)) { ByteBuf frame = - RequestChannelFrameFlyweight.encode( + RequestChannelFrameFlyweight.encodeReleasingPayload( allocator, streamId, false, n, initialPayload); sendProcessor.onNext(frame); - - initialPayload.release(); } } else { sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n)); @@ -474,8 +472,7 @@ private Mono handleMetadataPush(Payload payload) { return UnicastMonoEmpty.newInstance( () -> { ByteBuf metadataPushFrame = - MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain()); - payload.release(); + MetadataPushFrameFlyweight.encodeReleasingPayload(allocator, payload); sendProcessor.onNextPrioritized(metadataPushFrame); }); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 3860b8dae..5aef7eed2 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -399,16 +399,9 @@ protected void hookOnNext(Payload payload) { return; } - ByteBuf byteBuf; - try { - byteBuf = PayloadFrameFlyweight.encodeNextComplete(allocator, streamId, payload); - } catch (Throwable t) { - payload.release(); - throw Exceptions.propagate(t); - } - - payload.release(); - + ByteBuf byteBuf = + PayloadFrameFlyweight.encodeNextCompleteReleasingPayload( + allocator, streamId, payload); sendProcessor.onNext(byteBuf); } @@ -469,16 +462,8 @@ protected void hookOnNext(Payload payload) { return; } - ByteBuf byteBuf; - try { - byteBuf = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload); - } catch (Throwable t) { - payload.release(); - throw Exceptions.propagate(t); - } - - payload.release(); - + ByteBuf byteBuf = + PayloadFrameFlyweight.encodeNextReleasingPayload(allocator, streamId, payload); sendProcessor.onNext(byteBuf); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameFlyweight.java index d37b573ba..e3a9a47ba 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameFlyweight.java @@ -2,8 +2,16 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.rsocket.Payload; public class MetadataPushFrameFlyweight { + + public static ByteBuf encodeReleasingPayload(ByteBufAllocator allocator, Payload payload) { + final ByteBuf metadata = payload.metadata().retain(); + payload.release(); + return encode(allocator, metadata); + } + public static ByteBuf encode(ByteBufAllocator allocator, ByteBuf metadata) { ByteBuf header = FrameHeaderFlyweight.encodeStreamZero( diff --git a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java index 1b5204036..4c2ebdf6e 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java @@ -9,40 +9,43 @@ public class PayloadFrameFlyweight { private PayloadFrameFlyweight() {} - public static ByteBuf encode( - ByteBufAllocator allocator, - int streamId, - boolean fragmentFollows, - boolean complete, - boolean next, - ByteBuf metadata, - ByteBuf data) { - return FLYWEIGHT.encode( - allocator, streamId, fragmentFollows, complete, next, 0, metadata, data); - } - - public static ByteBuf encodeNext(ByteBufAllocator allocator, int streamId, Payload payload) { - return encode(allocator, streamId, false, payload); + public static ByteBuf encodeNextReleasingPayload( + ByteBufAllocator allocator, int streamId, Payload payload) { + return encodeReleasingPayload(allocator, streamId, false, payload); } - public static ByteBuf encodeNextComplete( + public static ByteBuf encodeNextCompleteReleasingPayload( ByteBufAllocator allocator, int streamId, Payload payload) { - return encode(allocator, streamId, true, payload); + return encodeReleasingPayload(allocator, streamId, true, payload); } - static ByteBuf encode( + static ByteBuf encodeReleasingPayload( ByteBufAllocator allocator, int streamId, boolean complete, Payload payload) { final boolean hasMetadata = payload.hasMetadata(); final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; final ByteBuf data = payload.data().retain(); - return FLYWEIGHT.encode(allocator, streamId, false, complete, true, 0, metadata, data); + payload.release(); + + return encode(allocator, streamId, false, complete, true, metadata, data); } public static ByteBuf encodeComplete(ByteBufAllocator allocator, int streamId) { - return FLYWEIGHT.encode(allocator, streamId, false, true, false, 0, null, null); + return encode(allocator, streamId, false, true, false, null, null); + } + + public static ByteBuf encode( + ByteBufAllocator allocator, + int streamId, + boolean fragmentFollows, + boolean complete, + boolean next, + ByteBuf metadata, + ByteBuf data) { + return FLYWEIGHT.encode( + allocator, streamId, fragmentFollows, complete, next, 0, metadata, data); } public static ByteBuf data(ByteBuf byteBuf) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java index 2a644b07a..7c3cbb574 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java @@ -10,7 +10,7 @@ public class RequestChannelFrameFlyweight { private RequestChannelFrameFlyweight() {} - public static ByteBuf encode( + public static ByteBuf encodeReleasingPayload( ByteBufAllocator allocator, int streamId, boolean complete, @@ -21,6 +21,8 @@ public static ByteBuf encode( final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; final ByteBuf data = payload.data().retain(); + payload.release(); + return encode(allocator, streamId, false, complete, initialRequestN, metadata, data); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java index 13b7d907d..287f765f7 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java @@ -10,12 +10,15 @@ public class RequestFireAndForgetFrameFlyweight { private RequestFireAndForgetFrameFlyweight() {} - public static ByteBuf encode(ByteBufAllocator allocator, int streamId, Payload payload) { + public static ByteBuf encodeReleasingPayload( + ByteBufAllocator allocator, int streamId, Payload payload) { final boolean hasMetadata = payload.hasMetadata(); final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; final ByteBuf data = payload.data().retain(); + payload.release(); + return FLYWEIGHT.encode(allocator, streamId, false, metadata, data); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java index d328c7fa2..3fbac27d2 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java @@ -10,12 +10,15 @@ public class RequestResponseFrameFlyweight { private RequestResponseFrameFlyweight() {} - public static ByteBuf encode(ByteBufAllocator allocator, int streamId, Payload payload) { + public static ByteBuf encodeReleasingPayload( + ByteBufAllocator allocator, int streamId, Payload payload) { final boolean hasMetadata = payload.hasMetadata(); final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; final ByteBuf data = payload.data().retain(); + payload.release(); + return encode(allocator, streamId, false, metadata, data); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java index 1c06d80c7..ff1435652 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java @@ -10,13 +10,15 @@ public class RequestStreamFrameFlyweight { private RequestStreamFrameFlyweight() {} - public static ByteBuf encode( + public static ByteBuf encodeReleasingPayload( ByteBufAllocator allocator, int streamId, long initialRequestN, Payload payload) { final boolean hasMetadata = payload.hasMetadata(); final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null; final ByteBuf data = payload.data().retain(); + payload.release(); + return encode(allocator, streamId, false, initialRequestN, metadata, data); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 533972b76..bc19d8132 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -194,7 +194,8 @@ public void testHandleValidFrame() { int streamId = rule.getStreamIdForRequestType(REQUEST_RESPONSE); rule.connection.addToReceivedBuffer( - PayloadFrameFlyweight.encodeNext(rule.alloc(), streamId, EmptyPayload.INSTANCE)); + PayloadFrameFlyweight.encodeNextReleasingPayload( + rule.alloc(), streamId, EmptyPayload.INSTANCE)); verify(sub).onComplete(); Assertions.assertThat(rule.connection.getSent()).hasSize(1).allMatch(ReferenceCounted::release); @@ -731,7 +732,8 @@ public int sendRequestResponse(Publisher response) { response.subscribe(sub); int streamId = rule.getStreamIdForRequestType(REQUEST_RESPONSE); rule.connection.addToReceivedBuffer( - PayloadFrameFlyweight.encodeNextComplete(rule.alloc(), streamId, EmptyPayload.INSTANCE)); + PayloadFrameFlyweight.encodeNextCompleteReleasingPayload( + rule.alloc(), streamId, EmptyPayload.INSTANCE)); verify(sub).onNext(any(Payload.class)); verify(sub).onComplete(); return streamId; diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java index 726be0770..78027aa3d 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java @@ -773,16 +773,22 @@ private void sendRequest(int streamId, FrameType frameType, Payload payload) { switch (frameType) { case REQUEST_CHANNEL: request = - RequestChannelFrameFlyweight.encode(allocator, streamId, false, prefetch, payload); + RequestChannelFrameFlyweight.encodeReleasingPayload( + allocator, streamId, false, prefetch, payload); break; case REQUEST_STREAM: - request = RequestStreamFrameFlyweight.encode(allocator, streamId, prefetch, payload); + request = + RequestStreamFrameFlyweight.encodeReleasingPayload( + allocator, streamId, prefetch, payload); break; case REQUEST_RESPONSE: - request = RequestResponseFrameFlyweight.encode(allocator, streamId, payload); + request = + RequestResponseFrameFlyweight.encodeReleasingPayload(allocator, streamId, payload); break; case REQUEST_FNF: - request = RequestFireAndForgetFrameFlyweight.encode(allocator, streamId, payload); + request = + RequestFireAndForgetFrameFlyweight.encodeReleasingPayload( + allocator, streamId, payload); break; default: throw new IllegalArgumentException("unsupported type: " + frameType); diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationIntegrationTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationIntegrationTest.java index 984207936..a8569ef3b 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationIntegrationTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationIntegrationTest.java @@ -28,7 +28,8 @@ public class FragmentationIntegrationTest { @Test void fragmentAndReassembleData() { ByteBuf frame = - PayloadFrameFlyweight.encodeNextComplete(allocator, 2, DefaultPayload.create(data)); + PayloadFrameFlyweight.encodeNextCompleteReleasingPayload( + allocator, 2, DefaultPayload.create(data)); System.out.println(FrameUtil.toString(frame)); frame.retain(); diff --git a/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java index e78adf9f1..439d23c15 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java @@ -15,7 +15,8 @@ public class PayloadFlyweightTest { void nextCompleteDataMetadata() { Payload payload = DefaultPayload.create("d", "md"); ByteBuf nextComplete = - PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); + PayloadFrameFlyweight.encodeNextCompleteReleasingPayload( + ByteBufAllocator.DEFAULT, 1, payload); String data = PayloadFrameFlyweight.data(nextComplete).toString(StandardCharsets.UTF_8); String metadata = PayloadFrameFlyweight.metadata(nextComplete).toString(StandardCharsets.UTF_8); Assertions.assertEquals("d", data); @@ -27,7 +28,8 @@ void nextCompleteDataMetadata() { void nextCompleteData() { Payload payload = DefaultPayload.create("d"); ByteBuf nextComplete = - PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); + PayloadFrameFlyweight.encodeNextCompleteReleasingPayload( + ByteBufAllocator.DEFAULT, 1, payload); String data = PayloadFrameFlyweight.data(nextComplete).toString(StandardCharsets.UTF_8); ByteBuf metadata = PayloadFrameFlyweight.metadata(nextComplete); Assertions.assertEquals("d", data); @@ -42,7 +44,8 @@ void nextCompleteMetaData() { Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer("md".getBytes(StandardCharsets.UTF_8))); ByteBuf nextComplete = - PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); + PayloadFrameFlyweight.encodeNextCompleteReleasingPayload( + ByteBufAllocator.DEFAULT, 1, payload); ByteBuf data = PayloadFrameFlyweight.data(nextComplete); String metadata = PayloadFrameFlyweight.metadata(nextComplete).toString(StandardCharsets.UTF_8); Assertions.assertTrue(data.readableBytes() == 0); @@ -53,7 +56,8 @@ void nextCompleteMetaData() { @Test void nextDataMetadata() { Payload payload = DefaultPayload.create("d", "md"); - ByteBuf next = PayloadFrameFlyweight.encodeNext(ByteBufAllocator.DEFAULT, 1, payload); + ByteBuf next = + PayloadFrameFlyweight.encodeNextReleasingPayload(ByteBufAllocator.DEFAULT, 1, payload); String data = PayloadFrameFlyweight.data(next).toString(StandardCharsets.UTF_8); String metadata = PayloadFrameFlyweight.metadata(next).toString(StandardCharsets.UTF_8); Assertions.assertEquals("d", data); @@ -64,7 +68,8 @@ void nextDataMetadata() { @Test void nextData() { Payload payload = DefaultPayload.create("d"); - ByteBuf next = PayloadFrameFlyweight.encodeNext(ByteBufAllocator.DEFAULT, 1, payload); + ByteBuf next = + PayloadFrameFlyweight.encodeNextReleasingPayload(ByteBufAllocator.DEFAULT, 1, payload); String data = PayloadFrameFlyweight.data(next).toString(StandardCharsets.UTF_8); ByteBuf metadata = PayloadFrameFlyweight.metadata(next); Assertions.assertEquals("d", data); @@ -75,7 +80,8 @@ void nextData() { @Test void nextDataEmptyMetadata() { Payload payload = DefaultPayload.create("d".getBytes(), new byte[0]); - ByteBuf next = PayloadFrameFlyweight.encodeNext(ByteBufAllocator.DEFAULT, 1, payload); + ByteBuf next = + PayloadFrameFlyweight.encodeNextReleasingPayload(ByteBufAllocator.DEFAULT, 1, payload); String data = PayloadFrameFlyweight.data(next).toString(StandardCharsets.UTF_8); ByteBuf metadata = PayloadFrameFlyweight.metadata(next); Assertions.assertEquals("d", data); diff --git a/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java b/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java index 9d78b2f3f..60ff05124 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java @@ -87,12 +87,12 @@ public static ByteBuf createTestRequestNFrame() { /** @return {@link ByteBuf} representing test instance of Request-Response frame */ public static ByteBuf createTestRequestResponseFrame() { - return RequestResponseFrameFlyweight.encode(allocator, 1, emptyPayload); + return RequestResponseFrameFlyweight.encodeReleasingPayload(allocator, 1, emptyPayload); } /** @return {@link ByteBuf} representing test instance of Request-Stream frame */ public static ByteBuf createTestRequestStreamFrame() { - return RequestStreamFrameFlyweight.encode(allocator, 1, 1L, emptyPayload); + return RequestStreamFrameFlyweight.encodeReleasingPayload(allocator, 1, 1L, emptyPayload); } /** @return {@link ByteBuf} representing test instance of Setup frame */ diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index e4ff75b2a..583f58634 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -35,10 +35,12 @@ import java.util.zip.GZIPInputStream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import reactor.core.Disposable; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; @@ -64,9 +66,15 @@ static String read(String resourceName) { } } + @BeforeEach + default void setUp() { + Hooks.onOperatorDebug(); + } + @AfterEach default void close() { getTransportPair().dispose(); + Hooks.resetOnOperatorDebug(); } default Payload createTestPayload(int metadataPresent) { @@ -175,7 +183,7 @@ default void requestChannel200_000() { .verify(getTimeout()); } - @DisplayName("makes 1 requestChannel request with 2,000 large payloads") + @DisplayName("makes 1 requestChannel request with 200 large payloads") @Test default void largePayloadRequestChannel200() { Flux payloads = Flux.range(0, 200).map(__ -> LARGE_PAYLOAD);