Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Payload#hasMetadata to strictly match the flag in the frame #783

Merged
merged 8 commits into from
Apr 24, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.SetupFrameFlyweight;
Expand All @@ -40,7 +41,8 @@ public boolean hasMetadata() {

@Override
public ByteBuf sliceMetadata() {
return SetupFrameFlyweight.metadata(setupFrame);
final ByteBuf metadata = SetupFrameFlyweight.metadata(setupFrame);
return metadata == null ? Unpooled.EMPTY_BUFFER : metadata;
}

@Override
Expand Down
52 changes: 13 additions & 39 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,8 @@ private Mono<Void> handleFireAndForget(Payload payload) {
return UnicastMonoEmpty.newInstance(
() -> {
ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encode(
allocator,
streamId,
false,
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
payload.sliceData().retain());
payload.release();
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(requestFrame);
});
Expand All @@ -245,13 +240,8 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
@Override
public void doOnSubscribe() {
final ByteBuf requestFrame =
RequestResponseFrameFlyweight.encode(
allocator,
streamId,
false,
payload.sliceMetadata().retain(),
payload.sliceData().retain());
payload.release();
RequestResponseFrameFlyweight.encodeReleasingPayload(
allocator, streamId, payload);

sendProcessor.onNext(requestFrame);
}
Expand Down Expand Up @@ -302,16 +292,10 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
public void accept(long n) {
if (firstRequest && !receiver.isDisposed()) {
firstRequest = false;
sendProcessor.onNext(
RequestStreamFrameFlyweight.encode(
allocator,
streamId,
false,
n,
payload.sliceMetadata().retain(),
payload.sliceData().retain()));
if (!payloadReleasedFlag.getAndSet(true)) {
payload.release();
sendProcessor.onNext(
RequestStreamFrameFlyweight.encodeReleasingPayload(
allocator, streamId, n, payload));
}
} else if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
Expand Down Expand Up @@ -400,10 +384,9 @@ protected void hookOnNext(Payload payload) {
return;
}
final ByteBuf frame =
PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, payload);
PayloadFrameFlyweight.encodeNextReleasingPayload(allocator, streamId, payload);

sendProcessor.onNext(frame);
payload.release();
}

@Override
Expand Down Expand Up @@ -444,18 +427,10 @@ public void accept(long n) {
.subscribe(upstreamSubscriber);
if (!payloadReleasedFlag.getAndSet(true)) {
ByteBuf frame =
RequestChannelFrameFlyweight.encode(
allocator,
streamId,
false,
false,
n,
initialPayload.sliceMetadata().retain(),
initialPayload.sliceData().retain());
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

sendProcessor.onNext(frame);

initialPayload.release();
}
} else {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
Expand Down Expand Up @@ -497,8 +472,7 @@ private Mono<Void> handleMetadataPush(Payload payload) {
return UnicastMonoEmpty.newInstance(
() -> {
ByteBuf metadataPushFrame =
MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain());
payload.release();
MetadataPushFrameFlyweight.encodeReleasingPayload(allocator, payload);

sendProcessor.onNextPrioritized(metadataPushFrame);
});
Expand Down Expand Up @@ -604,8 +578,8 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
{
Subscription sender = senders.get(streamId);
if (sender != null) {
int n = RequestNFrameFlyweight.requestN(frame);
sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
long n = RequestNFrameFlyweight.requestN(frame);
sender.request(n);
}
break;
}
Expand Down
39 changes: 12 additions & 27 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ private void handleFrame(ByteBuf frame) {
handleRequestN(streamId, frame);
break;
case REQUEST_STREAM:
int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
long streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
Payload streamPayload = payloadDecoder.apply(frame);
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN, null);
break;
case REQUEST_CHANNEL:
int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
long channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
Payload channelPayload = payloadDecoder.apply(frame);
handleChannel(streamId, channelPayload, channelInitialRequestN);
break;
Expand Down Expand Up @@ -399,16 +399,9 @@ protected void hookOnNext(Payload payload) {
return;
}

ByteBuf byteBuf;
try {
byteBuf = PayloadFrameFlyweight.encodeNextComplete(allocator, streamId, payload);
} catch (Throwable t) {
payload.release();
throw Exceptions.propagate(t);
}

payload.release();

ByteBuf byteBuf =
PayloadFrameFlyweight.encodeNextCompleteReleasingPayload(
allocator, streamId, payload);
sendProcessor.onNext(byteBuf);
}

Expand Down Expand Up @@ -437,14 +430,14 @@ protected void hookFinally(SignalType type) {
private void handleStream(
int streamId,
Flux<Payload> response,
int initialRequestN,
long initialRequestN,
@Nullable UnicastProcessor<Payload> requestChannel) {
final BaseSubscriber<Payload> subscriber =
new BaseSubscriber<Payload>() {

@Override
protected void hookOnSubscribe(Subscription s) {
s.request(initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
s.request(initialRequestN);
}

@Override
Expand All @@ -469,16 +462,8 @@ protected void hookOnNext(Payload payload) {
return;
}

ByteBuf byteBuf;
try {
byteBuf = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload);
} catch (Throwable t) {
payload.release();
throw Exceptions.propagate(t);
}

payload.release();

ByteBuf byteBuf =
PayloadFrameFlyweight.encodeNextReleasingPayload(allocator, streamId, payload);
sendProcessor.onNext(byteBuf);
}

Expand Down Expand Up @@ -523,7 +508,7 @@ protected void hookFinally(SignalType type) {
.subscribe(subscriber);
}

private void handleChannel(int streamId, Payload payload, int initialRequestN) {
private void handleChannel(int streamId, Payload payload, long initialRequestN) {
UnicastProcessor<Payload> frames = UnicastProcessor.create();
channelProcessors.put(streamId, frames);

Expand Down Expand Up @@ -602,8 +587,8 @@ private void handleRequestN(int streamId, ByteBuf frame) {
Subscription subscription = sendingSubscriptions.get(streamId);

if (subscription != null) {
int n = RequestNFrameFlyweight.requestN(frame);
subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
long n = RequestNFrameFlyweight.requestN(frame);
subscription.request(n);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
header = frame.copy(frame.readerIndex(), FrameHeaderFlyweight.size());

if (frameType == FrameType.REQUEST_CHANNEL || frameType == FrameType.REQUEST_STREAM) {
int i = RequestChannelFrameFlyweight.initialRequestN(frame);
header.writeInt(i);
long i = RequestChannelFrameFlyweight.initialRequestN(frame);
header.writeInt(i > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) i);
}
putHeader(streamId, header);
}
Expand Down Expand Up @@ -261,10 +261,16 @@ void reassembleFrame(ByteBuf frame, SynchronousSink<ByteBuf> sink) {
private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf header) {
ByteBuf metadata;
CompositeByteBuf cm = removeMetadata(streamId);
if (cm != null) {
metadata = cm.addComponents(true, PayloadFrameFlyweight.metadata(frame).retain());

ByteBuf decodedMetadata = PayloadFrameFlyweight.metadata(frame);
if (decodedMetadata != null) {
if (cm != null) {
metadata = cm.addComponents(true, decodedMetadata.retain());
} else {
metadata = PayloadFrameFlyweight.metadata(frame).retain();
}
} else {
metadata = PayloadFrameFlyweight.metadata(frame).retain();
metadata = cm != null ? cm : null;
}

ByteBuf data = assembleData(frame, streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,35 @@ private static int decodeLength(final ByteBuf byteBuf) {
return length;
}

static ByteBuf encodeOnlyMetadata(
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) {
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
}

static ByteBuf encodeOnlyData(ByteBufAllocator allocator, final ByteBuf header, ByteBuf data) {
return allocator.compositeBuffer(2).addComponents(true, header, data);
}

static ByteBuf encode(
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata, ByteBuf data) {
ByteBufAllocator allocator,
final ByteBuf header,
ByteBuf metadata,
boolean hasMetadata,
ByteBuf data) {

int length = metadata.readableBytes();
encodeLength(header, length);
return allocator.compositeBuffer(3).addComponents(true, header, metadata, data);
}
final boolean addData = data != null && data.isReadable();
final boolean addMetadata = hasMetadata && metadata.isReadable();

static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
if (hasMetadata) {
int length = decodeLength(byteBuf);
return byteBuf.readSlice(length);
int length = metadata.readableBytes();
encodeLength(header, length);
}

if (addMetadata && addData) {
return allocator.compositeBuffer(3).addComponents(true, header, metadata, data);
} else if (addMetadata) {
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
} else if (addData) {
return allocator.compositeBuffer(2).addComponents(true, header, data);
} else {
return Unpooled.EMPTY_BUFFER;
return header;
}
}

static ByteBuf metadata(ByteBuf byteBuf, boolean hasMetadata) {
byteBuf.markReaderIndex();
byteBuf.skipBytes(6);
ByteBuf metadata = metadataWithoutMarking(byteBuf, hasMetadata);
byteBuf.resetReaderIndex();
return metadata;
static ByteBuf metadataWithoutMarking(ByteBuf byteBuf) {
int length = decodeLength(byteBuf);
return byteBuf.readSlice(length);
}

static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
Expand All @@ -76,12 +73,4 @@ static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
return Unpooled.EMPTY_BUFFER;
}
}

static ByteBuf data(ByteBuf byteBuf, boolean hasMetadata) {
byteBuf.markReaderIndex();
byteBuf.skipBytes(6);
ByteBuf data = dataWithoutMarking(byteBuf, hasMetadata);
byteBuf.resetReaderIndex();
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,18 @@ public static ByteBuf encode(
@Nullable ByteBuf metadata,
ByteBuf data) {

final boolean hasMetadata = metadata != null;

int flags = FrameHeaderFlyweight.FLAGS_I;

if (metadata != null) {
if (hasMetadata) {
flags |= FrameHeaderFlyweight.FLAGS_M;
}

ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags);
final ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags);
header.writeInt(extendedType);
if (data == null && metadata == null) {
return header;
} else if (metadata != null) {
return DataAndMetadataFlyweight.encode(allocator, header, metadata, data);
} else {
return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
}

return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data);
}

public static int extendedType(ByteBuf byteBuf) {
Expand Down Expand Up @@ -56,10 +53,13 @@ public static ByteBuf metadata(ByteBuf byteBuf) {
FrameHeaderFlyweight.ensureFrameType(FrameType.EXT, byteBuf);

boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf);
if (!hasMetadata) {
return null;
}
byteBuf.markReaderIndex();
// Extended type
byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES);
ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata);
ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf);
byteBuf.resetReaderIndex();
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ public static ByteBuf encode(final ByteBufAllocator allocator, ByteBuf header, B
public static ByteBuf encode(
final ByteBufAllocator allocator, ByteBuf header, @Nullable ByteBuf metadata, ByteBuf data) {

if (data == null && metadata == null) {
return header;
} else if (metadata != null) {
return DataAndMetadataFlyweight.encode(allocator, header, metadata, data);
} else {
return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
}
final boolean hasMetadata = metadata != null;
return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static ByteBuf encode(

header.writeLong(lp);

return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
return DataAndMetadataFlyweight.encode(allocator, header, null, false, data);
}

public static boolean respondFlag(ByteBuf byteBuf) {
Expand Down
Loading