Skip to content

Commit

Permalink
Restore optimizations for avoid allocating new buffer wrappers.
Browse files Browse the repository at this point in the history
  • Loading branch information
voidzcy committed Apr 24, 2021
1 parent bfe1d7d commit 4d90800
Showing 1 changed file with 57 additions and 21 deletions.
78 changes: 57 additions & 21 deletions core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer {
private int readableBytes;
private boolean marked;

public CompositeReadableBuffer(int initialCapacity) {
readableBuffers = new ArrayDeque<>(initialCapacity);
rewindableBuffers = new ArrayDeque<>(initialCapacity);
}

public CompositeReadableBuffer() {
readableBuffers = new ArrayDeque<>();
rewindableBuffers = new ArrayDeque<>();
Expand Down Expand Up @@ -156,20 +161,44 @@ public void readBytes(OutputStream dest, int length) throws IOException {
execute(STREAM_OP, length, dest, 0);
}

private static final NoThrowReadOperation<CompositeReadableBuffer> COMPOSITE_OP =
new NoThrowReadOperation<CompositeReadableBuffer>() {
@Override
public int read(ReadableBuffer buffer, int length, CompositeReadableBuffer dest,
int unused) {
dest.addBuffer(buffer.readBytes(length));
return 0;
}
};

@Override
public CompositeReadableBuffer readBytes(int length) {
final CompositeReadableBuffer newBuffer = new CompositeReadableBuffer();
executeNoThrow(COMPOSITE_OP, length, newBuffer, 0);
public ReadableBuffer readBytes(int length) {
if (length <= 0) {
return ReadableBuffers.empty();
}
checkReadable(length);
readableBytes -= length;

ReadableBuffer newBuffer = null;
CompositeReadableBuffer newComposite = null;
do {
ReadableBuffer buffer = readableBuffers.peek();
int readable = buffer.readableBytes();
ReadableBuffer readBuffer;
if (readable > length) {
readBuffer = buffer.readBytes(length);
length = 0;
} else {
if (marked) {
readBuffer = buffer.readBytes(readable);
advanceBuffer();
} else {
readBuffer = readableBuffers.poll();
}
length -= readable;
}
if (newBuffer == null) {
newBuffer = readBuffer;
} else {
if (newComposite == null) {
newComposite = new CompositeReadableBuffer(
length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16));
newComposite.addBuffer(newBuffer);
newBuffer = newComposite;
}
newComposite.addBuffer(readBuffer);
}
} while (length > 0);
return newBuffer;
}

Expand Down Expand Up @@ -276,15 +305,22 @@ private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, i
private void advanceBufferIfNecessary() {
ReadableBuffer buffer = readableBuffers.peek();
if (buffer.readableBytes() == 0) {
if (marked) {
rewindableBuffers.add(readableBuffers.remove());
ReadableBuffer next = readableBuffers.peek();
if (next != null) {
next.mark();
}
} else {
readableBuffers.remove().close();
advanceBuffer();
}
}

/**
* Removes one buffer from the front and closes it.
*/
private void advanceBuffer() {
if (marked) {
rewindableBuffers.add(readableBuffers.remove());
ReadableBuffer next = readableBuffers.peek();
if (next != null) {
next.mark();
}
} else {
readableBuffers.remove().close();
}
}

Expand Down

0 comments on commit 4d90800

Please sign in to comment.