Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api, core: support zero copy into protobuf #8102

Merged
merged 48 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
9936df1
Add mark&reset methods and canUseByteBuffer&getByteBuffer methods to …
voidzcy Aug 17, 2020
aa93e1d
Default implementations.
voidzcy Aug 17, 2020
454e224
Wire new methods for forwarder
voidzcy Aug 17, 2020
3f47d5e
Support mark&reset and retrieving content via ByteBuffer for netty.
voidzcy Aug 17, 2020
b3c9974
Implementation for composite.
voidzcy Aug 17, 2020
2ede525
Define interface for accessing readable content via ByteBuffers.
voidzcy Aug 17, 2020
3dca312
Implement mark&reset for simple readable buffers.
voidzcy Aug 17, 2020
6249353
Use HasByteBuffer interface for accesing input stream's backing ByteB…
voidzcy Aug 17, 2020
29dce67
Eliminate the length argument for retrieving the ByteBuffer.
voidzcy Aug 17, 2020
22ea6c3
Do no require netty buffer to be direct from API's perspective.
voidzcy Aug 17, 2020
53e347c
Use Deque operations to avoid unncessary moves.
voidzcy Aug 17, 2020
27801fe
Make a list of ByteBuffers up-front instead of a running iterator.
voidzcy Aug 17, 2020
eb71a68
Add getByteBufferSupported method for HasByteBuffer so that it can be…
voidzcy Aug 17, 2020
5d3c657
It's not necessary to implement getByteBuffer for ByteReadbaleBufferW…
voidzcy Aug 17, 2020
9fd8d3c
Add test coverage for mark&reset and getByteBuffer for generic ByteBu…
voidzcy Aug 17, 2020
e3afe50
Add test coverage for netty's special get NIO bytebuffer operation.
voidzcy Aug 17, 2020
e2fdd07
Skip test for operations not supported by okhttp.
voidzcy Aug 17, 2020
033270b
Add test coverage for BufferInputStream with getByteBuffer operation.
voidzcy Aug 17, 2020
0622d51
Add test using a known-length input stream with getByteBuffer operati…
voidzcy Aug 17, 2020
0e8caee
Modify test method name.
voidzcy Aug 17, 2020
ba4e91b
Add test coverage for mark&reset and getByteBuffer for CompositeReada…
voidzcy Aug 17, 2020
69618b2
Add getByteBuffer support for ByteReadableBufferWrapper.
voidzcy Aug 20, 2020
437857d
Only pull ByteBuffers when message is large.
voidzcy Aug 21, 2020
1363505
Run ByteBuffer codepath only in Java 9+.
voidzcy Aug 28, 2020
c46eb73
Slight improvement for avoiding array creation if not necessary.
voidzcy Aug 28, 2020
7b4e070
Merge branch 'master' of github.com:grpc/grpc-java into impl/zero_cop…
voidzcy Aug 28, 2020
772b3ba
Change ReadableBuffer#canUseByteBuffer to hasByteBuffer.
voidzcy Sep 1, 2020
b1c99e5
Removed unnecessary reset.
voidzcy Sep 1, 2020
692076c
Simplify checking runtime java version.
voidzcy Sep 1, 2020
10c13b8
Add ExperimentalApi annotation.
voidzcy Sep 1, 2020
f13c165
Rename ReadableBuffer#hasByteBuffer to getByteBufferSupported.
voidzcy Sep 2, 2020
5c562b3
Merge branch 'master' of github.com:grpc/grpc-java into impl/zero_cop…
voidzcy Apr 20, 2021
0f804b7
Revert changes for MessageMarshaller.
voidzcy Apr 20, 2021
ad64a1d
Add Retainable interface that allows taking over resource ownership a…
voidzcy Apr 20, 2021
aa902aa
Make BufferInputStream implements Retainable. Its close() method beco…
voidzcy Apr 20, 2021
6cf0739
Remove no longer needed constructors.
voidzcy Apr 22, 2021
bfe1d7d
Change return type to be more specific.
voidzcy Apr 22, 2021
4d90800
Restore optimizations for avoid allocating new buffer wrappers.
voidzcy Apr 24, 2021
d8d030a
Optimize by allocating rewindable buffer deque lazily.
voidzcy Apr 27, 2021
f1f1c95
Change to Detachable API, which makes the original InputStream behave…
voidzcy May 6, 2021
a2f3a9e
Change naming for getByteBufferSupported().
voidzcy May 6, 2021
61c3447
Eliminate generics, use Object and casts instead.
voidzcy May 6, 2021
15a0314
Return Detachable instead of Object.
voidzcy May 6, 2021
f8afce5
Hook BufferInputStream's markSupported() with underlying buffers.
voidzcy May 6, 2021
2a6cbb6
Update Detachable interface definition, make it more specific to Inpu…
voidzcy May 12, 2021
e534f4a
Replace the internal buffer of BufferInputStream with an empty buffer…
voidzcy May 12, 2021
ad8940f
Add ExperimentalApi link.
voidzcy May 14, 2021
a6c1b13
Update Javadoc for Detachable.
voidzcy May 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions api/src/main/java/io/grpc/Detachable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

/**
* A <i>Detachable</i> encapsulates an object that can be forked with underlying resources
* detached and transferred to a new instance. The forked instance takes over the ownership
* of resources and is responsible for releasing after use. The forked instance preserves
* states of detached resources. Resources can be consumed through the forked instance as if
* being continually consumed through the original instance. The original instance discards
* states of detached resources and is no longer consumable as if the resources are exhausted.
*/
@ExperimentalApi("TODO")
public interface Detachable {

/**
* Fork a new instance with underlying resources detached from this instance and
* transferred to the new instance.
*
* @throws IllegalStateException if the underlying resources have already been detached.
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
*/
public Detachable detach();
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
}
52 changes: 52 additions & 0 deletions api/src/main/java/io/grpc/HasByteBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/**
* Extension to an {@link java.io.InputStream} whose content can be accessed as {@link
* ByteBuffer}s.
*
* <p>This can be used for optimizing the case for the consumer of a {@link ByteBuffer}-backed
* input stream supports efficient reading from {@link ByteBuffer}s directly. This turns the reader
* interface from an {@link java.io.InputStream} to {@link ByteBuffer}s, without copying the
* content to a byte array and read from it.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/7387")
public interface HasByteBuffer {

/**
* Indicates whether or not {@link #getByteBuffer} operation is supported.
*/
boolean byteBufferSupported();

/**
* Gets a {@link ByteBuffer} containing some bytes of the content next to be read, or {@code
* null} if has reached end of the content. The number of bytes contained in the returned buffer
* is implementation specific. Calling this method does not change the position of the input
* stream. The returned buffer's content should not be modified, but the position, limit, and
* mark may be changed. Operations for changing the position, limit, and mark of the returned
* buffer does not affect the position, limit, and mark of this input stream. This is an optional
* method, so callers should first check {@link #byteBufferSupported}.
*
* @throws UnsupportedOperationException if this operation is not supported.
*/
@Nullable
ByteBuffer getByteBuffer();
}
25 changes: 25 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.internal;

import java.nio.ByteBuffer;

/**
* Abstract base class for {@link ReadableBuffer} implementations.
*/
Expand Down Expand Up @@ -45,6 +47,29 @@ public int arrayOffset() {
throw new UnsupportedOperationException();
}

@Override
public boolean markSupported() {
return false;
}

@Override
public void mark() {}

@Override
public void reset() {
throw new UnsupportedOperationException();
}

@Override
public boolean byteBufferSupported() {
return false;
}

@Override
public ByteBuffer getByteBuffer() {
throw new UnsupportedOperationException();
}

@Override
public void close() {}

Expand Down
137 changes: 118 additions & 19 deletions core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Deque;
import javax.annotation.Nullable;

/**
* A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
Expand All @@ -33,15 +35,17 @@
*/
public class CompositeReadableBuffer extends AbstractReadableBuffer {

private final Deque<ReadableBuffer> readableBuffers;
private Deque<ReadableBuffer> rewindableBuffers;
private int readableBytes;
private final Queue<ReadableBuffer> buffers;
private boolean marked;

public CompositeReadableBuffer(int initialCapacity) {
buffers = new ArrayDeque<>(initialCapacity);
readableBuffers = new ArrayDeque<>(initialCapacity);
}

public CompositeReadableBuffer() {
buffers = new ArrayDeque<>();
readableBuffers = new ArrayDeque<>();
}

/**
Expand All @@ -51,16 +55,24 @@ public CompositeReadableBuffer() {
* this {@code CompositeBuffer}.
*/
public void addBuffer(ReadableBuffer buffer) {
boolean markHead = marked && readableBuffers.isEmpty();
enqueueBuffer(buffer);
if (markHead) {
readableBuffers.peek().mark();
}
}

private void enqueueBuffer(ReadableBuffer buffer) {
if (!(buffer instanceof CompositeReadableBuffer)) {
buffers.add(buffer);
readableBuffers.add(buffer);
readableBytes += buffer.readableBytes();
return;
}

CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
while (!compositeBuffer.buffers.isEmpty()) {
ReadableBuffer subBuffer = compositeBuffer.buffers.remove();
buffers.add(subBuffer);
while (!compositeBuffer.readableBuffers.isEmpty()) {
ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove();
readableBuffers.add(subBuffer);
}
readableBytes += compositeBuffer.readableBytes;
compositeBuffer.readableBytes = 0;
Expand Down Expand Up @@ -158,22 +170,27 @@ public ReadableBuffer readBytes(int length) {
ReadableBuffer newBuffer = null;
CompositeReadableBuffer newComposite = null;
do {
ReadableBuffer buffer = buffers.peek();
ReadableBuffer buffer = readableBuffers.peek();
int readable = buffer.readableBytes();
ReadableBuffer readBuffer;
if (readable > length) {
readBuffer = buffer.readBytes(length);
length = 0;
} else {
readBuffer = buffers.poll();
if (marked) {
readBuffer = buffer.readBytes(readable);
advanceBuffer();
} else {
readBuffer = readableBuffers.poll();
}
length -= readable;
}
if (newBuffer == null) {
newBuffer = readBuffer;
} else {
if (newComposite == null) {
newComposite =
new CompositeReadableBuffer(length == 0 ? 2 : Math.min(buffers.size() + 2, 16));
newComposite = new CompositeReadableBuffer(
length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16));
newComposite.addBuffer(newBuffer);
newBuffer = newComposite;
}
Expand All @@ -183,10 +200,77 @@ public ReadableBuffer readBytes(int length) {
return newBuffer;
}

@Override
public boolean markSupported() {
for (ReadableBuffer buffer : readableBuffers) {
if (!buffer.markSupported()) {
return false;
}
}
return true;
}

@Override
public void mark() {
if (rewindableBuffers == null) {
rewindableBuffers = new ArrayDeque<>(Math.min(readableBuffers.size(), 16));
}
while (!rewindableBuffers.isEmpty()) {
rewindableBuffers.remove().close();
}
marked = true;
ReadableBuffer buffer = readableBuffers.peek();
if (buffer != null) {
buffer.mark();
}
}

@Override
public void reset() {
if (!marked) {
throw new InvalidMarkException();
}
ReadableBuffer buffer;
if ((buffer = readableBuffers.peek()) != null) {
int currentRemain = buffer.readableBytes();
buffer.reset();
readableBytes += (buffer.readableBytes() - currentRemain);
}
while ((buffer = rewindableBuffers.pollLast()) != null) {
buffer.reset();
readableBuffers.addFirst(buffer);
readableBytes += buffer.readableBytes();
}
}

@Override
public boolean byteBufferSupported() {
for (ReadableBuffer buffer : readableBuffers) {
if (!buffer.byteBufferSupported()) {
return false;
}
}
return true;
}

@Nullable
@Override
public ByteBuffer getByteBuffer() {
if (readableBuffers.isEmpty()) {
return null;
}
return readableBuffers.peek().getByteBuffer();
}

@Override
public void close() {
while (!buffers.isEmpty()) {
buffers.remove().close();
while (!readableBuffers.isEmpty()) {
readableBuffers.remove().close();
}
if (rewindableBuffers != null) {
while (!rewindableBuffers.isEmpty()) {
rewindableBuffers.remove().close();
}
}
}

Expand All @@ -197,12 +281,12 @@ public void close() {
private <T> int execute(ReadOperation<T> op, int length, T dest, int value) throws IOException {
checkReadable(length);

if (!buffers.isEmpty()) {
if (!readableBuffers.isEmpty()) {
advanceBufferIfNecessary();
}

for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
ReadableBuffer buffer = buffers.peek();
for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) {
ReadableBuffer buffer = readableBuffers.peek();
int lengthToCopy = Math.min(length, buffer.readableBytes());

// Perform the read operation for this buffer.
Expand Down Expand Up @@ -232,9 +316,24 @@ private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, i
* If the current buffer is exhausted, removes and closes it.
*/
private void advanceBufferIfNecessary() {
ReadableBuffer buffer = buffers.peek();
ReadableBuffer buffer = readableBuffers.peek();
if (buffer.readableBytes() == 0) {
buffers.remove().close();
advanceBuffer();
}
}

/**
* Removes one buffer from the front and closes it.
*/
private void advanceBuffer() {
if (marked) {
rewindableBuffers.add(readableBuffers.remove());
ReadableBuffer next = readableBuffers.peek();
if (next != null) {
next.mark();
}
} else {
readableBuffers.remove().close();
}
}

Expand Down
27 changes: 27 additions & 0 deletions core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/**
* Base class for a wrapper around another {@link ReadableBuffer}.
Expand Down Expand Up @@ -96,6 +97,32 @@ public int arrayOffset() {
return buf.arrayOffset();
}

@Override
public boolean markSupported() {
return buf.markSupported();
}

@Override
public void mark() {
buf.mark();
}

@Override
public void reset() {
buf.reset();
}

@Override
public boolean byteBufferSupported() {
return buf.byteBufferSupported();
}

@Nullable
@Override
public ByteBuffer getByteBuffer() {
return buf.getByteBuffer();
}

@Override
public void close() {
buf.close();
Expand Down
Loading