-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gRPC streaming client. #1054
gRPC streaming client. #1054
Conversation
At a very high level (interface level), this approach LGTM. |
Do you want to get the API closer to the gRPC async stream API? The void* tag can be replaced by a callback which indicates the finishing of corresponding operation. |
@fengli79 I don't think it makes sense to use the exact gRPC async stream API, as what I'm doing is closer to the Envoy model for callbacks (declaring a callback object rather than supplying individual callbacks). With that said, I think there's useful stuff in the interface you point to wrt stream termination, status/error handling and metadata that we can use to make the |
include/envoy/grpc/async_client.h
Outdated
* TODO(htuch): Add support for metadata. | ||
* @param service_full_name gRPC service name. | ||
* @param method_name gRPC method name. | ||
* @param end_stream supplies whether this is a headers-only request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no headers-only request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, there's no headers-only request. However, the server side API allows to send back response as soon as request metadata is received. So, in theory, you can implement a grpc server like that.
include/envoy/grpc/async_client.h
Outdated
* @param request protobuf serializable message. | ||
* @param end_stream supplies whether this is the last message. | ||
*/ | ||
virtual void sendMessage(const RequestType& request, bool end_stream) PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably need a method to just send end_stream (empty DATA frame with end_stream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
include/envoy/grpc/async_client.h
Outdated
* Called when an async gRPC stream is remotely closed with successful status. | ||
* @param metadata the trailing metadata. | ||
*/ | ||
virtual void onRemoteClose() PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is metadata
? also there should be a callback to handle initial metadata too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need following semantic on the context, means they are available during a grpc call:
getServerInitialMetadata()
getServerTrailingMetadata()
Need following semantic as a sender:
AddMetadata. (Modify the request metadata)
Write and its callback. (Write and the callback of the write operation)
WriteDone and its callback. (A half close from client side and done callback of the half close operation)
Write with Options and with its callback. (Send a message with half close all together, important for unary request performance, grpc uses a write operation to implement this semantic, you may seek for a better method name. Other options may be supported to set compression algorithm, etc.)
Need following semantic as a receiver:
onInitialMetadata. (Callback when receiving response metadata)
onMessage.
onFinish. (Whenever the grpc call is done, success or failure. Prefer to get a single point to handle finish.)
include/envoy/grpc/async_client.h
Outdated
* the handle can be used to send more messages or close the stream. | ||
*/ | ||
virtual AsyncClientStream<RequestType>* | ||
start(const std::string& service_full_name, const std::string& method_name, bool end_stream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider take a protobuf descriptor pointer const google::protobuf::MethodDescriptor*
here instead of service name and method name, it should be easy to get from generated pb.h and it contains more information you might want to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide an example of how to extract this for https://github.com/lyft/envoy/blob/master/test/proto/helloworld.proto?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't worry, figured it out. helloworld::Greeter::descriptor()->FindMethodByName("SayHello");
include/envoy/grpc/status.h
Outdated
Internal = 13, | ||
Unavailable = 14, | ||
DataLoss = 15, | ||
InvalidCode = 16, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you decided copy this but not reusing protobuf (which is fine), this shouldn't be 16
. maybe -1
?
https://github.com/grpc/grpc/blob/master/include/grpc%2B%2B/impl/codegen/status_code_enum.h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is copied from #1067 - I will rebase on that when it merges and use whatever is there. Please direct feedback there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both Unauthenticated and InvalidCode get same value (16)?
I don't think you need InvalidCode, as if grpc expand the enum in the future, it introduces conflict for sure.
Also, please copy all the comments. And add one additional comment to indicate that this enum should be kept sync with grpc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the other comment, please see #1067 for the status code bits that will be part of this final PR once it is non-WIP.
Http::AsyncClient::Stream* http_stream = | ||
cm_.httpAsyncClientForCluster(remote_cluster_name_) | ||
.start(*grpc_stream, Optional<std::chrono::milliseconds>(timeout)); | ||
grpc_stream->set_stream(http_stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this after the null check?
source/common/grpc/common.cc
Outdated
void Common::checkForHeaderOnlyError(Http::Message& http_response) { | ||
void Common::validateHeaders(const Http::HeaderMap& headers) { | ||
if (Http::Utility::getResponseStatus(headers) != enumToInt(Http::Code::OK)) { | ||
throw Exception(Optional<uint64_t>(), "non-200 response code"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should fail gracefully, not an exception.
http://www.grpc.io/docs/guides/wire.html
Implementations should expect broken deployments to send non-200 HTTP status codes in responses as well as a variety of non-GRPC content-types and to omit Status & Status-Message. Implementations must synthesize a Status & Status-Message to propagate to the application layer when this occurs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was here before. I'm actually going to undo my changes to this file, since I've inlined the relevant bits into async_client_impl.h
.
using testing::Invoke; | ||
using testing::Eq; | ||
using testing::NiceMock; | ||
// using testing::Ref; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
for (const auto& frame : frames) { | ||
std::unique_ptr<ResponseType> response(new ResponseType()); | ||
if (frame.flags_ != GRPC_FH_DEFAULT || | ||
!response->ParseFromArray(frame.data_->linearize(frame.data_->length()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a TODO here for not do linearize. I will have a PR to convert Envoy Buffer
to protobuf ZeroCopyInputStream
include/envoy/grpc/async_client.h
Outdated
* @param request protobuf serializable message. | ||
* @param end_stream supplies whether this is the last message. | ||
*/ | ||
virtual void sendMessage(const RequestType& request, bool end_stream) PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
include/envoy/grpc/async_client.h
Outdated
* Called when an async gRPC stream is remotely closed with successful status. | ||
* @param metadata the trailing metadata. | ||
*/ | ||
virtual void onRemoteClose() PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need following semantic on the context, means they are available during a grpc call:
getServerInitialMetadata()
getServerTrailingMetadata()
Need following semantic as a sender:
AddMetadata. (Modify the request metadata)
Write and its callback. (Write and the callback of the write operation)
WriteDone and its callback. (A half close from client side and done callback of the half close operation)
Write with Options and with its callback. (Send a message with half close all together, important for unary request performance, grpc uses a write operation to implement this semantic, you may seek for a better method name. Other options may be supported to set compression algorithm, etc.)
Need following semantic as a receiver:
onInitialMetadata. (Callback when receiving response metadata)
onMessage.
onFinish. (Whenever the grpc call is done, success or failure. Prefer to get a single point to handle finish.)
include/envoy/grpc/async_client.h
Outdated
* TODO(htuch): Add support for metadata. | ||
* @param service_full_name gRPC service name. | ||
* @param method_name gRPC method name. | ||
* @param end_stream supplies whether this is a headers-only request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, there's no headers-only request. However, the server side API allows to send back response as soon as request metadata is received. So, in theory, you can implement a grpc server like that.
* Called when populating the headers to send with initial metadata. | ||
* @param metadata initial metadata reference. | ||
*/ | ||
virtual void onCreateInitialMetadata(Http::HeaderMap& metadata) PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"onCreate" is confusing here. The method name should be able to represent when this method is going to be invoked.
I guess it should be invoked right after the request initial metadata gets sent?
If it's intended to be invoked before the request initial metadata gets sent, seems we don't need a callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is before the initial metadata is sent, and provides an opportunity to fill in the metadata. This is similar to how RpcChannel
works today, see https://github.com/lyft/envoy/blob/master/include/envoy/grpc/rpc_channel.h#L30.
include/envoy/grpc/async_client.h
Outdated
/** | ||
* Finish the stream. No further operations are permitted on the stream. | ||
*/ | ||
virtual void finish() PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment looks not right. The client is still be able to receive data on the stream. This is more like a half close on the request, the response processing should not be impacted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, what was there before was confusing, in particular if we compare with the Finish semantics in the gRPC C++ library, which is a full close. In the model we're adopting in AsyncClient
here, there's independent local/remote closing of streams. I will rename this to close
to make the semantics clearer.
remote_cluster_name_, service_method.service()->full_name(), service_method.name()); | ||
callbacks.onCreateInitialMetadata(message->headers()); | ||
|
||
http_stream->sendHeaders(message->headers(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expose sendHeaders via grpc_stream. Sometimes, the initial metadata need to be prepared in an async routine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onCreateInitialMetadata
method in callbacks provides this facility.
|
||
// Http::AsyncClient::StreamCallbacks | ||
void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override { | ||
if (remote_closed_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the headers should be still be delivered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I seems that once we've invoked onRemoteClose(..)
on callbacks (which is implied by remote_close_
), we shouldn't be delivering any further response callbacks as the logical stream is now broken. We're basically waiting for the client owner to invoke finish()
so we can mop up all the stream resources. I'll try and add some more comments to make this clearer in the interface docs.
return; | ||
} | ||
if (end_stream) { | ||
onTrailers(std::move(headers)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should fake trailers when headers are received actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
http://www.grpc.io/docs/guides/wire.html refers to this as a Trailers-Only response.
std::unique_ptr<ResponseType> response(new ResponseType()); | ||
// TODO(htuch): We can avoid linearizing the buffer here when Buffer::Instance implements | ||
// protobuf ZeroCopyInputStream. | ||
if (frame.flags_ != GRPC_FH_DEFAULT || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The response can be compressed. So, it can be GRPC_FH_DEFAULT | compressed flag.
Please add a TODO here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
} | ||
|
||
const GrpcStatus grpc_status = Common::getGrpcStatus(*trailers); | ||
if (grpc_status == GrpcStatus::InvalidCode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove InvalidCode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what the ask is here, can you elaborate some more? The status codes are not part of this PR, I temporarily patched in an earlier revision of #1067, which I plan to merge back in here after it hits master. In the most recent #1067, if grpc_status < 0
in this scenario, it indicates a missing/invalid code, so I will update this logic to reflect that.
test/test_common/utility.cc
Outdated
@@ -64,6 +65,11 @@ std::string TestUtility::bufferToString(const Buffer::Instance& buffer) { | |||
return output; | |||
} | |||
|
|||
std::string TestUtility::bufferToHexString(const Buffer::Instance& buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this func used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but it was useful in the test development. I might split this out into a separate PR.
|
||
const std::string HELLO_REQUEST = "ABC"; | ||
// We expect the 5 byte header to only have a length of 5 indicating the size of the protobuf. The | ||
// protobuf beings with 0xa, indicating this is the first field of type string. This is followed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
begins with x0a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please fix.
s/beings with 0xa/begins with x0a
const char HELLO_REQUEST_DATA[] = "\x00\x00\x00\x00\x05\x0a\x03\x41\x42\x43"; | ||
const size_t HELLO_REQUEST_SIZE = sizeof(HELLO_REQUEST_DATA) - 1; | ||
|
||
const std::string HELLO_REPLY = "CBA"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would pick HELLO_REPLY as something like 'DEFG', so HELLO_REPLY_SIZE will be different with HELLO_REQUEST_SIZE. And the content will be different as well.
This is a templatized gRPC client than can be used to implement unary/streaming (client, server, bidi) for arbitrary protobuf request/response messages. This will be used by the v2 xDS API clients for gRPC streams.
@fengli79 @wattli @lizan @mattklein123 This is now ready for review. |
void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override { | ||
ASSERT(!remote_closed_); | ||
if (Http::Utility::getResponseStatus(*headers) != enumToInt(Http::Code::OK)) { | ||
streamError(Status::GrpcStatus::Internal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The HTTP code to gRPC status code mapping is defined here
https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
We should follow this for non-2xx HTTP codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks great. few things.
@@ -0,0 +1,113 @@ | |||
#pragma once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general comment. Can we open issue to track deprecating RpcChannel? This is superior and we should just get rid of the other code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, see #1102.
* Start a gRPC stream asynchronously. | ||
* @param service_method protobuf descriptor of gRPC service method. | ||
* @param callbacks the callbacks to be notified of stream status. | ||
* @param timeout supplies the stream timeout, measured since when the frame with end_stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure off the top of my head how to fix this yet, but calling out that I'm not sure this definition of timeout makes sense for a streaming RPC. Not sure if we want to leave this here, or split interface somehow for streaming and unary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was cribbing from the Http::AsyncClient
comments here for stream start()
. @lizan Do you know what the intended behavior is for Http::AsyncClient
here, I can't figure it out, seems to be just setting a whole stream timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had previous discussion around this. The intended behavior for Http::AsyncClient
is what you have comment here, since it is also used for unary. I think this only make sense for unary calls, so if you want use the AsyncClient for unary call too, probably just leave this here and comment it so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think it's fine for now, we can revisit later, we will only be using it with infinite timeout for the APIs.
remote_cluster_name_, service_method.service()->full_name(), service_method.name()); | ||
callbacks.onCreateInitialMetadata(message->headers()); | ||
|
||
http_stream->sendHeaders(message->headers(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked at this code in a while, but fairly certain that sendHeaders() can result in an inline reset. Will the code work in that case or does the logic below need to be modified to potentially detect and return nullptr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if I see that in the current implementation (it can do a closeLocal()
if end_stream
is true, but it isn't here). I'll add handling to be defensive though, since the interface allows this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you send headers, it's possible to get an immediate reset for example due to http/2 stream exhaustion or circuit breaking. I think that case, you want to return nullptr and raise a callback immediately?
|
||
void onData(Buffer::Instance& data, bool end_stream) override { | ||
ASSERT(!remote_closed_); | ||
std::vector<Frame> frames; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a slight perf improvement might consider making this a class variable. In the streaming case, if we clear the frames after dispatch, I think we can retain the capacity and then use it again later (I think). Not a huge deal but though I would point it out.
} | ||
callbacks_.onReceiveMessage(std::move(response)); | ||
} | ||
if (end_stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a protocol failure, should we check this before even trying to do the decode up above?
#include "gtest/gtest.h" | ||
|
||
namespace Envoy { | ||
using testing::_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in the rest of the code we have been putting the using statements outside of the Envoy namespace.
@@ -388,6 +388,14 @@ class MockAsyncClientRequest : public AsyncClient::Request { | |||
MockAsyncClient* client_; | |||
}; | |||
|
|||
class MockAsyncClientStream : public AsyncClient::Stream { | |||
public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per gmock compile perf guidelines can we declare constructor/deconstructor here and then define in .cc file. For other mocks in this PR, I'm not sure if you can do that with template mocks, but if you can we should do that (probably not).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think this will work with the templates unless we instantiate for every type that is used.
@lizan Can you comment on the HTTP/gRPC async client timeout question above? If I don't get any further feedback, I'll merge and followup in a separate issue (in the interest of moving the v2 API implementation along). |
This is a templatized gRPC client than can be used to implement unary/streaming (client, server,
bidi) for arbitrary protobuf request/response messages. This will be used by the v2 xDS API clients
for gRPC streams.