Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api, core: support zero copy into protobuf #8102

Merged
merged 48 commits into from
May 14, 2021

Conversation

voidzcy
Copy link
Contributor

@voidzcy voidzcy commented Apr 20, 2021

A variant of #7330, taking the alternative codepath in protobuf.

Major changes of this PR:

  • Added a HasByteBuffer API that allows the marshaller to access the backing ByteBuffers directly (cherry-picked from protobuf, api, core, netty: zero copy into protobuf #7330).
  • Added a Detachable API that allows the application to take over the ownership of underlying buffers and close them later.

The new approach is to let the application implement a marshaller that passes ByteBuffers as immutable to protobuf:

This requires buffers to be alive all the time through the proto messages used by the application (compared to the normal codepath that makes a copy of the bytes when parsed to proto messages and recycle buffers right after). The application is responsible for managing the lifetime of underlying buffers. An example marshaller that enables zero-copy codepath can be something like the following:

  class ZeroCopyMarshaller<T extends Message> implements PrototypeMarshaller<T> {
    private IdentityHashMap<T, InputStream> unclosedStreams = new IdentityHashMap<>();
    private final Parser<T> parser;

    ZeroCopyMarshaller(T defaultInstance) {
      parser = (Parser<T>) defaultInstance.getParserForType();
    }

    // ... ...

    @Override
    public T parse(InputStream stream) {
      CodedInputStream cis = null;
      boolean streamDetached = false;
      if (stream instanceof Detachable) {
        stream = ((Detachable) stream).detach();
        streamDetached = true;
      }
      try {
        if (stream instanceof KnownLength) {
          if (stream instanceof HasByteBuffer
              && ((HasByteBuffer) stream).byteBufferSupported()) {  // fastest path, zero copy
            List<ByteString> byteStrings = new ArrayList<>();
            stream.mark(stream.available());  // retain bytes in ByteBuffers so that calling skip() won't throw bytes away
            while (stream.available() != 0) {
              ByteBuffer buffer = ((HasByteBuffer) stream).getByteBuffer();
              byteStrings.add(UnsafeByteOperations.unsafeWrap(buffer));
              stream.skip(buffer.remaining());
            }
            stream.reset();
            cis = ByteString.copyFrom(byteStrings).newCodedInput();
          } else {
            // slightly slower path, copy into a byte array
            // see ProtoLiteUtils#MessageMarshaller
            // ...
          }
        }
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      if (cis == null) {  // slowest path, via InputStream read
        cis = CodedInputStream.newInstance(stream);
      }

      // ... ...

      T message;
      try {
        message = parseFrom(cis);
      } catch (InvalidProtocolBufferException ipbe) {
        throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence").withCause(ipbe).asRuntimeException();
      }
      if (streamDetached) {
        unclosedStreams.put(message, stream);
      }
      return message;
    }

    private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
      T message = parser.parseFrom(stream);
      try {
        stream.checkLastTagWas(0);
        return message;
      } catch (InvalidProtocolBufferException e) {
        e.setUnfinishedMessage(message);
        throw e;
      }
    }

    // Application calls this method when no longer need the message.
    public void releaseMessage(T message) {
      InputStream stream = unclosedStreams.get(message);
      if (stream != null) {
        try {
          stream.close();
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
        unclosedStreams.remove(message);
      }
    }
  }

voidzcy added 30 commits August 16, 2020 19:21
@voidzcy voidzcy requested a review from ejona86 April 22, 2021 22:22
@voidzcy voidzcy force-pushed the impl/zero_copy_into_protobuf_3 branch from 1c95862 to 4d90800 Compare April 24, 2021 02:12
@voidzcy voidzcy added the kokoro:force-run Add this label to a PR to tell Kokoro to re-run all tests. Not generally necessary label Apr 27, 2021
@grpc-kokoro grpc-kokoro removed the kokoro:force-run Add this label to a PR to tell Kokoro to re-run all tests. Not generally necessary label Apr 27, 2021
@voidzcy voidzcy requested review from ejona86 and removed request for ejona86 April 27, 2021 22:30
@voidzcy
Copy link
Contributor Author

voidzcy commented May 7, 2021

@ejona86 Updated. PTAL and let me know if anything I am misunderstanding. Thanks you!

Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments for Detachable are probably best discussed during API review and see what people think. I think this looks pretty good. Main thing is I'd hope is we don't need all the if (detached) checks in the implementation.

core/src/main/java/io/grpc/internal/ReadableBuffers.java Outdated Show resolved Hide resolved
api/src/main/java/io/grpc/Detachable.java Outdated Show resolved Hide resolved
api/src/main/java/io/grpc/Detachable.java Outdated Show resolved Hide resolved
voidzcy added 2 commits May 11, 2021 19:07
… after being detached. Further operations on the detached BufferInputStream just delegate to the empty buffer.
@voidzcy
Copy link
Contributor Author

voidzcy commented May 14, 2021

Updated descriptions in this PR and ExperimentalApi links in the code. Please let me know if any doc needs improvement. Thanks for the review!

import java.io.InputStream;

/**
* A <i>Detachable</i> encapsulates some readable data source that can be detached and transferred
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to "throw the user a bone" here and give them a better hint why this exists. Let's say something about how the detached instance can outlive the original instance. And how it may be useful in a Marshaller to perform delayed deserialization or when combined with HasByteBuffer.

An really, let's rephrase the documentation to describe this as "An extension of InputStream." I think it will make it much more clear what is going on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@veblush
Copy link
Contributor

veblush commented May 18, 2021

Thank you for the PR! My early benchmark shows that this can save ~20% of cpu time when used in the GCS benchmark client. (link)

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Aug 17, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants