-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend AsyncClient to support streaming requests / responses #353
Conversation
@lizan I'm OOO tomorrow, will take a look next week when all tests are in place. At a very high level approach is fine. |
test/mocks/http/mocks.h
Outdated
@@ -294,6 +294,14 @@ class MockAsyncClient : public AsyncClient { | |||
|
|||
MOCK_METHOD3(send_, Request*(MessagePtr& request, Callbacks& callbacks, | |||
const Optional<std::chrono::milliseconds>& timeout)); | |||
|
|||
Stream* start(StreamCallbacks& callbacks, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to override method, you can simply
MOCK_METHOD2(start, Stream*(StreamCallbacks& callbacks,
const Optionalstd::chrono::milliseconds& timeout))
this should work just fine
public: | ||
AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout); | ||
virtual ~AsyncRequestImpl(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, AsyncRequestImpl is final there should not be a need for virtual destructor
Router::ProdFilter router_; | ||
std::function<void()> reset_callback_; | ||
AccessLog::RequestInfoImpl request_info_; | ||
RouteImpl route_; | ||
bool complete_{}; | ||
Buffer::Instance* decoding_buffer_{nullptr}; | ||
bool internal_header_inserted_{false}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: bool internal_header_inserted_{};
same for * decoding_buffer_{};
include/envoy/http/async_client.h
Outdated
@@ -50,6 +83,39 @@ class AsyncClient { | |||
virtual void cancel() PURE; | |||
}; | |||
|
|||
/** | |||
* An in-flight HTTP stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: full stop here and down below for comments on methods.
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) { | ||
AsyncStreamImpl* async_request = new AsyncStreamImpl(*this, callbacks, timeout); | ||
std::unique_ptr<AsyncStreamImpl> new_request{async_request}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: combine into one line
std::unique_ptr new_request(new AsyncStreamImpl(*this, callbacks, timeout));
headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True); | ||
headers.insertForwardedFor().value(parent_.config_.local_info_.address()); | ||
router_.decodeHeaders(headers, end_stream); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new line in between
This is cool. Just wondering if this stream can be shared by multiple threads. 1) can multiple thread write data to this stream. 2) When response data is received, which thread will be called. |
@qiwzhang no, as written, AsyncClient is thread local and cannot be safely shared between threads. It would be possible to surround it with enough locking to make it safe, but my general recommendation would be to reframe your problem as a thread local problem. I have not yet found a problem that cannot be solved cleanly with this pattern. Happy to discuss this offline with you. |
@mattklein123 Thanks for your quick response. In this case, istio/proxy will need to create N gRPC streams to istio/mixer if it has N threads. I feel it will be better if they can share the same gRPC stream since some transport optimizations of mixer api are done based on the gRPC stream. We will go with one stream per thread for now to make thing easier, will visit this later if performance is an issue. |
@lizan can you tag me when the tests are finished and you address roman's comments. then I will take a look. |
@mattklein123 sure, I'm making more changes since I discovered some problems in the tests. In the meanwhile it would be nice if you can take a look on the interfaces (i.e. async_client.h). |
@lizan sure will do. |
include/envoy/http/async_client.h
Outdated
/*** | ||
* Close the stream. | ||
*/ | ||
virtual void close() PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do exactly? The above methods implicitly send end_stream. Is this a reset? If so can we call it reset() and provide a reset reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is comparable to Request::cancel()
and its implementation are almost resetting stream and cleanup. I can rename it to reset().
The underlying Filter
(i.e. router_)'s interface doesn't have reset reason, so I didn't provide that here. Do you think it is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry you are right, I just looked at the code again. The fact that there is no reason is due to how we are wrapping the router filter to do all of this. I would just rename to reset() for clarity since IMO that makes more sense in the streaming case.
* the handle can be used to send more messages or close the stream. | ||
*/ | ||
virtual Stream* start(StreamCallbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) PURE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's questionable whether timeout makes sense on this interface, since the only reason I could see it really ever being used is for long lived streaming API requests like you are implementing it for. But I don't have a strong opinion and could go either way. I guess since it's optional, we can leave it. (Please update the comments though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion either. I will update comments soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explicitly says if no timeout given, it's infinite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is behavior if timeout is hit, does it implicitly call Stream.reset() and by implication the Callback.onResetStream gets called & on what thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is all single threaded. Yes, when timeout is hit the stream will be reset with callbacks called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When timeout, it will call StreamCallbacks.onHeader with :status 504 and onData, the header and data is coming from underlying filters.
See this test:
https://github.com/lyft/envoy/pull/353/files#diff-918cf37446b66e452d1114b88b1e61c4R578
@mattklein123 Thanks for reviewing it, and sorry I was busy on other things today, will update the PR with tests soon. |
@mattklein123 This is ready to review, PTAL. Thanks! |
@RomanDzhabarov please review. I will take a final pass once you are done. |
I'll take another pass on this today. |
include/envoy/http/async_client.h
Outdated
virtual ~Stream() {} | ||
|
||
/*** | ||
* Send headers to the stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explicitly says the sendHeaders method cannot be invoked more than once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Router::ProdFilter router_; | ||
std::function<void()> reset_callback_; | ||
AccessLog::RequestInfoImpl request_info_; | ||
RouteImpl route_; | ||
bool complete_{}; | ||
Buffer::Instance* decoding_buffer_{nullptr}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: decoding_buffer_{}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
test/mocks/http/mocks.h
Outdated
void onResetStream() override { onResetStream_(); } | ||
|
||
MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream)); | ||
MOCK_METHOD2(onData_, void(Buffer::Instance& data, bool end_stream)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"void onData(Buffer::Instance& data, bool end_stream) override { onData_(data, end_stream); }" is not required, just define MOCK_METHOD2 properly for onData.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
test/mocks/http/mocks.h
Outdated
} | ||
void onData(Buffer::Instance& data, bool end_stream) override { onData_(data, end_stream); } | ||
void onTrailers(HeaderMapPtr&& trailers) override { onTrailers_(*trailers); } | ||
void onResetStream() override { onResetStream_(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/mocks/http/mocks.h
Outdated
@@ -317,6 +338,19 @@ class MockAsyncClientRequest : public AsyncClient::Request { | |||
|
|||
MockAsyncClient* client_; | |||
}; | |||
|
|||
class MockAsyncClientStream : public AsyncClient::Stream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this used at all? and other added mocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mock is not used, removed. What are others? MockAsyncClientStreamCallbacks is used in the AsyncClientImplTest.
@@ -38,7 +38,40 @@ class AsyncClient { | |||
}; | |||
|
|||
/** | |||
* An in-flight HTTP request | |||
* Notifies caller of async HTTP stream status. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth having some exposition about the full-duplex nature of streams in the documentation. For instance StreamCallbacks can continue to receive events even if the Stream has had sendXXX(..., end_stream=true) called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
void AsyncStreamImpl::closeLocal(bool end_stream) { | ||
local_closed_ |= end_stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug assertion if local already closed and end_stream = true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { | ||
response_.reset(new ResponseMessageImpl(std::move(headers))); | ||
void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattklein123 this is some odd nomenclature, why are headers received from the transport in a method called 'encode...' ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I explained this above, this is an internal implementation detail, the response is being "encoded" into the client, which is then dispatching the appropriate callbacks.
@lizan there are a huge amount of comments on this review. I'm fine with the overall approach. When everyone's concerns are satisfied or if there are other questions let me know and I can review in detail. |
@fengli79 @louiscryan @RomanDzhabarov PTAL again, thanks! |
} | ||
} | ||
|
||
AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) { | ||
std::unique_ptr<AsyncRequestImpl> new_request{ | ||
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)}; | ||
AsyncRequestImpl* async_request = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just initialize this directly in unique_ptr and return front().get() like you do below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a raw pointer here is to avoid downcasting from AsyncStreamImpl
to AsyncRequestImpl
when it returns. AsyncStreamImpl
is not a subclass of AsyncClient::Request
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok sorry sounds good
ASSERT(!(local_closed_ && end_stream)); | ||
|
||
local_closed_ |= end_stream; | ||
if (complete()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: braces around all if statements
|
||
void AsyncStreamImpl::closeRemote(bool end_stream) { | ||
remote_closed_ |= end_stream; | ||
if (complete()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: braces around all if statements
void onTrailers(HeaderMapPtr&& trailers) override; | ||
void onReset() override; | ||
void onComplete(); | ||
MessagePtr request_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline before this line
void onData(Buffer::Instance& data, bool end_stream) override; | ||
void onTrailers(HeaderMapPtr&& trailers) override; | ||
void onReset() override; | ||
void onComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move to top of section
virtual void cancel() override; | ||
|
||
private: | ||
void onHeaders(HeaderMapPtr&& headers, bool end_stream) override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefix overrides with // AsyncClient::StreamCallbacks
cleanup(); | ||
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { | ||
router_.decodeData(data, end_stream); | ||
decoding_buffer_ = &data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not quite right and is kind of complicated. In short:
- When you call router_.decodeData(...) it will return a response code which we currently ignore which tells you whether to buffer or not. Buffering is only done in certain cases, for example, if we might retry, or shadow. In this case, the "connection manager" (AsyncClient) is expected to buffer the entire request.
- In the streaming case, it doesn't make sense to buffer. Therefore, I would probably check the return code of router_.decodeData(...) and if it asks for buffering, throw an exception of some kind to indicate programmer error.
so basically for the streaming case I think decodingBuffer() should return nullptr, in this normal case, it should return the request body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry just to further clarify, buffering should continue to work in the AsyncRequestImpl case (non-streaming).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (!new_request->complete_) { | ||
new_request->moveIntoList(std::move(new_request), active_requests_); | ||
return active_requests_.front().get(); | ||
if (!new_request->complete()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading the code correctly, I think you actually want to check for remote_close_ here now, not complete_, or, arguably better, you need to set local_close_ in resetStream().
The reason for this is that the router might respond right away. If it responds before the request is complete, it will reset. In this case, you should return a null handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry just looked at the code in detail (I haven't looked at this stuff in a while). The router will not actually call resetStream() on its own, it relies on the connection manager to reset if the request when the response is complete. (The main server makes the assumption that even in the streaming case, if the response is finished, the request should be reset if it is not finished).
So, in this case, I would probably just check for remote_close_ and if that is set, return a null handle and do not send any more data. You could also replicate the connection manager behavior, and if remote is closed before local is closed, close local and "reset."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S., this stuff is pretty complicated. If you want to chat in realtime find me in the public Gitter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looked at the code again, check remote_close_ sounds reasonable, done.
@mattklein123 Sorry for the delay, PTAL when you have time, thanks! |
FilterDataStatus status = router_.decodeData(data, end_stream); | ||
|
||
if (status == FilterDataStatus::StopIterationAndBuffer) { | ||
decoding_buffer_ = &data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still not right. To probably support buffering you actually need to buffer the data. For example, have Buffer::OwnedImpl, and add each data frame to it. I don't think this is worth supporting, as I don't think there is any reasonable case where for a streaming request we will need buffering. I would refactor the code slightly in such a way where in the base streaming case, you don't support buffering (throw an exception if something results in buffering being needed), but in the non-streaming case you do what we did previously, which is to just return the request's body data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, refactored it to throw exception in AsyncStreamImpl, and return body in AsyncRequestImpl.
@@ -164,7 +164,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, | |||
AccessLog::RequestInfo& requestInfo() override { return request_info_; } | |||
const std::string& downstreamAddress() override { return EMPTY_STRING; } | |||
void continueDecoding() override { NOT_IMPLEMENTED; } | |||
const Buffer::Instance* decodingBuffer() override { return decoding_buffer_; } | |||
const Buffer::Instance* decodingBuffer() override { NOT_IMPLEMENTED; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to just crash. Can you throw an actual EnvoyException (which will also end up crashing the process but indirectly) and add a test for this case please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Description: this PR fixes a couple bugs to allow e2e use of the Envoy Mobile library in the swift iOS demo. Risk Level: low - fixing bugs Testing: ./bazelw build --config=ios --xcode_version=10.3.0.10G8 //:ios_dist and then ./bazelw run --config=ios --xcode_version=10.3.0.10G8 //examples/swift/hello_world:app Signed-off-by: Jose Nino <[email protected]> Signed-off-by: JP Simard <[email protected]>
Description: this PR fixes a couple bugs to allow e2e use of the Envoy Mobile library in the swift iOS demo. Risk Level: low - fixing bugs Testing: ./bazelw build --config=ios --xcode_version=10.3.0.10G8 //:ios_dist and then ./bazelw run --config=ios --xcode_version=10.3.0.10G8 //examples/swift/hello_world:app Signed-off-by: Jose Nino <[email protected]> Signed-off-by: JP Simard <[email protected]>
AsyncRequestImpl
toAsyncStreamImpl
and add streaming requests / responsesAsyncRequestImpl
based onAsyncStreamImpl
Existing
AsyncClientImplTest
covers most ofAsyncStreamImpl
. Working on more tests for streams.@qiwzhang PTAL.
Closes #317.