Skip to content

Commit

Permalink
provides refactoring related to ensuring that hasMetadata is propagat…
Browse files Browse the repository at this point in the history
…ed correctly

Right now there was observed a few issues related to that

Payload with no metadata was incorrectly incoded so it results to hasMetadata true on the received side
Also, optimized the API of Flyweights to ensure that we have common things in one place

Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Apr 23, 2020
1 parent c93c456 commit 6f74ae9
Show file tree
Hide file tree
Showing 31 changed files with 572 additions and 447 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.SetupFrameFlyweight;
Expand All @@ -40,7 +41,8 @@ public boolean hasMetadata() {

@Override
public ByteBuf sliceMetadata() {
return SetupFrameFlyweight.metadata(setupFrame);
final ByteBuf metadata = SetupFrameFlyweight.metadata(setupFrame);
return metadata == null ? Unpooled.EMPTY_BUFFER : metadata;
}

@Override
Expand Down
39 changes: 8 additions & 31 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,7 @@ private Mono<Void> handleFireAndForget(Payload payload) {
return UnicastMonoEmpty.newInstance(
() -> {
ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encode(
allocator,
streamId,
false,
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
payload.sliceData().retain());
RequestFireAndForgetFrameFlyweight.encode(allocator, streamId, payload);
payload.release();

sendProcessor.onNext(requestFrame);
Expand Down Expand Up @@ -245,12 +240,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
@Override
public void doOnSubscribe() {
final ByteBuf requestFrame =
RequestResponseFrameFlyweight.encode(
allocator,
streamId,
false,
payload.sliceMetadata().retain(),
payload.sliceData().retain());
RequestResponseFrameFlyweight.encode(allocator, streamId, payload);
payload.release();

sendProcessor.onNext(requestFrame);
Expand Down Expand Up @@ -302,15 +292,9 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
public void accept(long n) {
if (firstRequest && !receiver.isDisposed()) {
firstRequest = false;
sendProcessor.onNext(
RequestStreamFrameFlyweight.encode(
allocator,
streamId,
false,
n,
payload.sliceMetadata().retain(),
payload.sliceData().retain()));
if (!payloadReleasedFlag.getAndSet(true)) {
sendProcessor.onNext(
RequestStreamFrameFlyweight.encode(allocator, streamId, n, payload));
payload.release();
}
} else if (contains(streamId) && !receiver.isDisposed()) {
Expand Down Expand Up @@ -399,8 +383,7 @@ protected void hookOnNext(Payload payload) {
receiver.onError(t);
return;
}
final ByteBuf frame =
PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, payload);
final ByteBuf frame = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload);

sendProcessor.onNext(frame);
payload.release();
Expand Down Expand Up @@ -445,13 +428,7 @@ public void accept(long n) {
if (!payloadReleasedFlag.getAndSet(true)) {
ByteBuf frame =
RequestChannelFrameFlyweight.encode(
allocator,
streamId,
false,
false,
n,
initialPayload.sliceMetadata().retain(),
initialPayload.sliceData().retain());
allocator, streamId, false, n, initialPayload);

sendProcessor.onNext(frame);

Expand Down Expand Up @@ -604,8 +581,8 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
{
Subscription sender = senders.get(streamId);
if (sender != null) {
int n = RequestNFrameFlyweight.requestN(frame);
sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
long n = RequestNFrameFlyweight.requestN(frame);
sender.request(n);
}
break;
}
Expand Down
33 changes: 25 additions & 8 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.lease.ResponderLeaseHandler;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import javax.annotation.Nullable;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -300,12 +301,12 @@ private void handleFrame(ByteBuf frame) {
handleRequestN(streamId, frame);
break;
case REQUEST_STREAM:
int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
long streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
Payload streamPayload = payloadDecoder.apply(frame);
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN, null);
break;
case REQUEST_CHANNEL:
int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
long channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
Payload channelPayload = payloadDecoder.apply(frame);
handleChannel(streamId, channelPayload, channelInitialRequestN);
break;
Expand Down Expand Up @@ -436,14 +437,14 @@ protected void hookFinally(SignalType type) {
private void handleStream(
int streamId,
Flux<Payload> response,
int initialRequestN,
long initialRequestN,
@Nullable UnicastProcessor<Payload> requestChannel) {
final BaseSubscriber<Payload> subscriber =
new BaseSubscriber<Payload>() {

@Override
protected void hookOnSubscribe(Subscription s) {
s.request(initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
s.request(initialRequestN);
}

@Override
Expand Down Expand Up @@ -522,14 +523,30 @@ protected void hookFinally(SignalType type) {
.subscribe(subscriber);
}

private void handleChannel(int streamId, Payload payload, int initialRequestN) {
private void handleChannel(int streamId, Payload payload, long initialRequestN) {
UnicastProcessor<Payload> frames = UnicastProcessor.create();
channelProcessors.put(streamId, frames);

Flux<Payload> payloads =
frames
.doOnRequest(
l -> sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, l)))
new LongConsumer() {
boolean first = true;

@Override
public void accept(long l) {
long n;
if (first) {
first = false;
n = l - 1L;
} else {
n = l;
}
if (n > 0) {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}
}
})
.doFinally(
signalType -> {
if (channelProcessors.remove(streamId, frames)) {
Expand Down Expand Up @@ -585,8 +602,8 @@ private void handleRequestN(int streamId, ByteBuf frame) {
Subscription subscription = sendingSubscriptions.get(streamId);

if (subscription != null) {
int n = RequestNFrameFlyweight.requestN(frame);
subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
long n = RequestNFrameFlyweight.requestN(frame);
subscription.request(n);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
header = frame.copy(frame.readerIndex(), FrameHeaderFlyweight.size());

if (frameType == FrameType.REQUEST_CHANNEL || frameType == FrameType.REQUEST_STREAM) {
int i = RequestChannelFrameFlyweight.initialRequestN(frame);
header.writeInt(i);
long i = RequestChannelFrameFlyweight.initialRequestN(frame);
header.writeInt(i > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) i);
}
putHeader(streamId, header);
}
Expand Down Expand Up @@ -261,10 +261,16 @@ void reassembleFrame(ByteBuf frame, SynchronousSink<ByteBuf> sink) {
private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf header) {
ByteBuf metadata;
CompositeByteBuf cm = removeMetadata(streamId);
if (cm != null) {
metadata = cm.addComponents(true, PayloadFrameFlyweight.metadata(frame).retain());

ByteBuf decodedMetadata = PayloadFrameFlyweight.metadata(frame);
if (decodedMetadata != null) {
if (cm != null) {
metadata = cm.addComponents(true, decodedMetadata.retain());
} else {
metadata = PayloadFrameFlyweight.metadata(frame).retain();
}
} else {
metadata = PayloadFrameFlyweight.metadata(frame).retain();
metadata = cm != null ? cm : null;
}

ByteBuf data = assembleData(frame, streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,35 @@ private static int decodeLength(final ByteBuf byteBuf) {
return length;
}

static ByteBuf encodeOnlyData(ByteBufAllocator allocator, final ByteBuf header, ByteBuf data) {
return allocator.compositeBuffer(2).addComponents(true, header, data);
}

static ByteBuf encode(
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata, ByteBuf data) {

int length = metadata.readableBytes();
encodeLength(header, length);
return allocator.compositeBuffer(3).addComponents(true, header, metadata, data);
}

static ByteBuf encode(ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) {
ByteBufAllocator allocator,
final ByteBuf header,
ByteBuf metadata,
boolean hasMetadata,
ByteBuf data) {

int length = metadata.readableBytes();
encodeLength(header, length);
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
}
final boolean addData = data != null && data.isReadable();
final boolean addMetadata = hasMetadata && metadata.isReadable();

static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
if (hasMetadata) {
int length = decodeLength(byteBuf);
return byteBuf.readSlice(length);
int length = metadata.readableBytes();
encodeLength(header, length);
}

if (addMetadata && addData) {
return allocator.compositeBuffer(3).addComponents(true, header, metadata, data);
} else if (addMetadata) {
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
} else if (addData) {
return allocator.compositeBuffer(2).addComponents(true, header, data);
} else {
return Unpooled.EMPTY_BUFFER;
return header;
}
}

static ByteBuf metadata(ByteBuf byteBuf, boolean hasMetadata) {
byteBuf.markReaderIndex();
byteBuf.skipBytes(6);
ByteBuf metadata = metadataWithoutMarking(byteBuf, hasMetadata);
byteBuf.resetReaderIndex();
return metadata;
static ByteBuf metadataWithoutMarking(ByteBuf byteBuf) {
int length = decodeLength(byteBuf);
return byteBuf.readSlice(length);
}

static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
Expand All @@ -78,12 +73,4 @@ static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
return Unpooled.EMPTY_BUFFER;
}
}

static ByteBuf data(ByteBuf byteBuf, boolean hasMetadata) {
byteBuf.markReaderIndex();
byteBuf.skipBytes(6);
ByteBuf data = dataWithoutMarking(byteBuf, hasMetadata);
byteBuf.resetReaderIndex();
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ public static ByteBuf encode(
@Nullable ByteBuf metadata,
ByteBuf data) {

final boolean hasData = data != null && data.isReadable();
final boolean hasMetadata = metadata != null && metadata.isReadable();
final boolean hasMetadata = metadata != null;

int flags = FrameHeaderFlyweight.FLAGS_I;

Expand All @@ -25,15 +24,8 @@ public static ByteBuf encode(

final ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags);
header.writeInt(extendedType);
if (hasData && hasMetadata) {
return DataAndMetadataFlyweight.encode(allocator, header, metadata, data);
} else if (hasMetadata) {
return DataAndMetadataFlyweight.encode(allocator, header, metadata);
} else if (hasData) {
return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
} else {
return header;
}

return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data);
}

public static int extendedType(ByteBuf byteBuf) {
Expand Down Expand Up @@ -61,10 +53,13 @@ public static ByteBuf metadata(ByteBuf byteBuf) {
FrameHeaderFlyweight.ensureFrameType(FrameType.EXT, byteBuf);

boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf);
if (!hasMetadata) {
return null;
}
byteBuf.markReaderIndex();
// Extended type
byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES);
ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata);
ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf);
byteBuf.resetReaderIndex();
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,7 @@ public static ByteBuf encode(final ByteBufAllocator allocator, ByteBuf header, B
public static ByteBuf encode(
final ByteBufAllocator allocator, ByteBuf header, @Nullable ByteBuf metadata, ByteBuf data) {

final boolean hasData = data != null && data.isReadable();
final boolean hasMetadata = metadata != null && metadata.isReadable();

if (hasData && hasMetadata) {
return DataAndMetadataFlyweight.encode(allocator, header, metadata, data);
} else if (hasMetadata) {
return DataAndMetadataFlyweight.encode(allocator, header, metadata);
} else if (hasData) {
return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
} else {
return header;
}
final boolean hasMetadata = metadata != null;
return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static ByteBuf encode(

header.writeLong(lp);

return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
return DataAndMetadataFlyweight.encode(allocator, header, null, false, data);
}

public static boolean respondFlag(ByteBuf byteBuf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public static ByteBuf encode(
@Nullable final ByteBuf metadata) {

final boolean hasMetadata = metadata != null;
final boolean writesMetadata = hasMetadata && metadata.isReadable();
final boolean addMetadata = hasMetadata && metadata.isReadable();

int flags = 0;

Expand All @@ -27,7 +27,7 @@ public static ByteBuf encode(
.writeInt(ttl)
.writeInt(numRequests);

if (writesMetadata) {
if (addMetadata) {
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
} else {
return header;
Expand Down
Loading

0 comments on commit 6f74ae9

Please sign in to comment.