-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend AsyncClient to support streaming requests / responses #353
Changes from 14 commits
ebbb140
7266341
a1d2b14
373ba1f
7c7ab57
803f824
f157b49
0a491f7
410e361
edbcb91
6b5b227
46ac4ca
663c6fc
afd4b0d
05538d4
944bde5
a1c75c7
ab38439
580edff
5a1ccc6
6bc5ef5
a321140
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,40 @@ class AsyncClient { | |
}; | ||
|
||
/** | ||
* An in-flight HTTP request | ||
* Notifies caller of async HTTP stream status. | ||
*/ | ||
class StreamCallbacks { | ||
public: | ||
virtual ~StreamCallbacks() {} | ||
|
||
/** | ||
* Called when the async HTTP stream receives headers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When all headers get received. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
* @param headers the headers received | ||
* @param end_stream whether the response is header only | ||
*/ | ||
virtual void onHeaders(HeaderMapPtr&& headers, bool end_stream) PURE; | ||
|
||
/** | ||
* Called when the async HTTP stream receives data. | ||
* @param data the data received | ||
* @param end_stream whether the data is the last data frame | ||
*/ | ||
virtual void onData(Buffer::Instance& data, bool end_stream) PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can be invoked multiple times if the data get streamed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is stream closure signalled even if no data was delivered? Seems cleaner to separate into a separate callback otherwise closure handling will end up being overloaded into each handler impl There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To me the StreamDecoderFilter is an interface which already provide all the functionality. The Stream interface looks like an unnecessary wrapper on top of it. Use the StreamDecoderFilter directly by get the router (or a wrapper of router with active count, etc) is more straightforward. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I mostly agree about StreamDecoderFilter but there are many aspects of StreamDecoderFilterCallbacks that people are unlikely to need or want. The fact that an explicit 'reset' handler must be installed does make the API less obvious than this one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... the more I look at this the less convinced I am. The StreamXXXFilter methods are more like the ones I would expect a client to implement and the StreamXXXFilterCallbacks look more like the write interface. @mattklein123 can probably shed some like on the relationships and why things are implemented this way. For pure client use-cases I would expect an API more like the one @lizan wrote. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the simple interface that @lizan wrote is the one to use. Here is why this all works this way: The way AsyncClient is implemented is that it basically mimics ActiveStream inside the HTTP connection manager, and instantiates a router filter to do the work. On one hand, this is kind of hack, on the other hand, it's an implementation detail so it doesn't really matter. So, in the end of the data, internally, the async client implements the StreamXXXCallback methods, which is the write interface, and then translates those "writes" into async client request/stream callbacks, which is this interface. |
||
|
||
/** | ||
* Called when the async HTTP stream receives trailers. | ||
* @param trailers the trailers received. | ||
*/ | ||
virtual void onTrailers(HeaderMapPtr&& trailers) PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When all trailers get received. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
/** | ||
* Called when the async HTTP stream is reset. | ||
*/ | ||
virtual void onResetStream() PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. onReset to matches with the reset() method in Stream interface. Or change it to resetStream(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renamed to onReset |
||
}; | ||
|
||
/** | ||
* An in-flight HTTP request. | ||
*/ | ||
class Request { | ||
public: | ||
|
@@ -50,18 +83,64 @@ class AsyncClient { | |
virtual void cancel() PURE; | ||
}; | ||
|
||
/** | ||
* An in-flight HTTP stream. | ||
*/ | ||
class Stream { | ||
public: | ||
virtual ~Stream() {} | ||
|
||
/*** | ||
* Send headers to the stream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explicitly says the sendHeaders method cannot be invoked more than once? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
* @param headers supplies the headers to send. | ||
* @param end_stream supplies whether this is a header only request. | ||
*/ | ||
virtual void sendHeaders(HeaderMap& headers, bool end_stream) PURE; | ||
|
||
/*** | ||
* Send data to the stream. | ||
* @param data supplies the data to send. | ||
* @param end_stream supplies whether this is the last data. | ||
*/ | ||
virtual void sendData(Buffer::Instance& data, bool end_stream) PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explicitly says the sendData can be invoked multiple times if it get streamed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above about closing without data. I don't mind collapsing data write with closure here but then this API needs to document how to do that with no data, e.g. using an empty or null buffer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... also what is behavior if a call is made after end_stream has already occurred? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's programming error. Looks like an error code should be returned. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we need code to distinguish between programming error when double local close/reset and transport failure (local close after remote reset.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A few things:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, documented that should use sendData with empty buffer to half close the stream. |
||
|
||
/*** | ||
* Send trailers. This implicitly ends the stream. | ||
* @param trailers supplies the trailers to send. | ||
*/ | ||
virtual void sendTrailers(HeaderMap& trailers) PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as sendHeaders, sendTrailer cannot be invoked more than once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
/*** | ||
* Reset the stream. | ||
*/ | ||
virtual void reset() PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this call trigger StreamCallbacks.onResetStream or does that only occur if the RESET is received from the remote side. If we're thread local then it seems like it should call. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does call, but this is an internal implementation detail of the async client / request, so I don't think we need to expose from this interface. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StreamCallbacks.onReset only occur if the RESET is received from the remote. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now reset() will call StreamCallbacks.onReset |
||
}; | ||
|
||
virtual ~AsyncClient() {} | ||
|
||
/** | ||
* Send an HTTP request asynchronously | ||
* @param request the request to send. | ||
* @param callbacks the callbacks to be notified of request status. | ||
* @param timeout supplies the request timeout | ||
* @return a request handle or nullptr if no request could be created. NOTE: In this case | ||
* onFailure() has already been called inline. The client owns the request and the | ||
* handle should just be used to cancel. | ||
*/ | ||
virtual Request* send(MessagePtr&& request, Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) PURE; | ||
|
||
/** | ||
* Start an HTTP stream asynchronously. | ||
* @param callbacks the callbacks to be notified of stream status. | ||
* @param timeout supplies the stream timeout, measured since when the frame with end_stream | ||
* flag is sent until when the first frame is received. | ||
* @return a stream handle or nullptr if no stream could be started. NOTE: In this case | ||
* onResetStream() has already been called inline. The client owns the stream and | ||
* the handle can be used to send more messages or close the stream. | ||
*/ | ||
virtual Stream* start(StreamCallbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's questionable whether timeout makes sense on this interface, since the only reason I could see it really ever being used is for long lived streaming API requests like you are implementing it for. But I don't have a strong opinion and could go either way. I guess since it's optional, we can leave it. (Please update the comments though). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have a strong opinion either. I will update comments soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explicitly says if no timeout given, it's infinite. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is behavior if timeout is hit, does it implicitly call Stream.reset() and by implication the Callback.onResetStream gets called & on what thread? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is all single threaded. Yes, when timeout is hit the stream will be reset with callbacks called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When timeout, it will call StreamCallbacks.onHeader with :status 504 and onData, the header and data is coming from underlying filters. See this test: |
||
}; | ||
|
||
typedef std::unique_ptr<AsyncClient> AsyncClientPtr; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,14 @@ | ||
#include "async_client_impl.h" | ||
#include "headers.h" | ||
|
||
namespace Http { | ||
|
||
const std::vector<std::reference_wrapper<const Router::RateLimitPolicyEntry>> | ||
AsyncRequestImpl::NullRateLimitPolicy::rate_limit_policy_entry_; | ||
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::RouteEntryImpl::rate_limit_policy_; | ||
const AsyncRequestImpl::NullRetryPolicy AsyncRequestImpl::RouteEntryImpl::retry_policy_; | ||
const AsyncRequestImpl::NullShadowPolicy AsyncRequestImpl::RouteEntryImpl::shadow_policy_; | ||
const AsyncRequestImpl::NullVirtualHost AsyncRequestImpl::RouteEntryImpl::virtual_host_; | ||
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::NullVirtualHost::rate_limit_policy_; | ||
AsyncStreamImpl::NullRateLimitPolicy::rate_limit_policy_entry_; | ||
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::RouteEntryImpl::rate_limit_policy_; | ||
const AsyncStreamImpl::NullRetryPolicy AsyncStreamImpl::RouteEntryImpl::retry_policy_; | ||
const AsyncStreamImpl::NullShadowPolicy AsyncStreamImpl::RouteEntryImpl::shadow_policy_; | ||
const AsyncStreamImpl::NullVirtualHost AsyncStreamImpl::RouteEntryImpl::virtual_host_; | ||
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::NullVirtualHost::rate_limit_policy_; | ||
|
||
AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store, | ||
Event::Dispatcher& dispatcher, | ||
|
@@ -22,120 +21,182 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St | |
dispatcher_(dispatcher) {} | ||
|
||
AsyncClientImpl::~AsyncClientImpl() { | ||
while (!active_requests_.empty()) { | ||
active_requests_.front()->failDueToClientDestroy(); | ||
while (!active_streams_.empty()) { | ||
active_streams_.front()->failDueToClientDestroy(); | ||
} | ||
} | ||
|
||
AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) { | ||
std::unique_ptr<AsyncRequestImpl> new_request{ | ||
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)}; | ||
AsyncRequestImpl* async_request = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: just initialize this directly in unique_ptr and return front().get() like you do below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having a raw pointer here is to avoid downcasting from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok sorry sounds good |
||
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout); | ||
std::unique_ptr<AsyncStreamImpl> new_request{async_request}; | ||
|
||
// The request may get immediately failed. If so, we will return nullptr. | ||
if (!new_request->complete_) { | ||
new_request->moveIntoList(std::move(new_request), active_requests_); | ||
return active_requests_.front().get(); | ||
if (!new_request->complete()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm reading the code correctly, I think you actually want to check for remote_close_ here now, not complete_, or, arguably better, you need to set local_close_ in resetStream(). The reason for this is that the router might respond right away. If it responds before the request is complete, it will reset. In this case, you should return a null handle. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry just looked at the code in detail (I haven't looked at this stuff in a while). The router will not actually call resetStream() on its own, it relies on the connection manager to reset if the request when the response is complete. (The main server makes the assumption that even in the streaming case, if the response is finished, the request should be reset if it is not finished). So, in this case, I would probably just check for remote_close_ and if that is set, return a null handle and do not send any more data. You could also replicate the connection manager behavior, and if remote is closed before local is closed, close local and "reset." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P.S., this stuff is pretty complicated. If you want to chat in realtime find me in the public Gitter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looked at the code again, check remote_close_ sounds reasonable, done. |
||
new_request->moveIntoList(std::move(new_request), active_streams_); | ||
return async_request; | ||
} else { | ||
return nullptr; | ||
} | ||
} | ||
|
||
AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, | ||
AsyncClient::Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) | ||
: request_(std::move(request)), parent_(parent), callbacks_(callbacks), | ||
stream_id_(parent.config_.random_.random()), router_(parent.config_), | ||
request_info_(Protocol::Http11), route_(parent_.cluster_.name(), timeout) { | ||
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) { | ||
std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, timeout)}; | ||
|
||
router_.setDecoderFilterCallbacks(*this); | ||
request_->headers().insertEnvoyInternalRequest().value( | ||
Headers::get().EnvoyInternalRequestValues.True); | ||
request_->headers().insertForwardedFor().value(parent_.config_.local_info_.address()); | ||
router_.decodeHeaders(request_->headers(), !request_->body()); | ||
if (!complete_ && request_->body()) { | ||
router_.decodeData(*request_->body(), true); | ||
// The request may get immediately failed. If so, we will return nullptr. | ||
if (!new_stream->complete()) { | ||
new_stream->moveIntoList(std::move(new_stream), active_streams_); | ||
return active_streams_.front().get(); | ||
} else { | ||
return nullptr; | ||
} | ||
} | ||
|
||
// TODO: Support request trailers. | ||
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) | ||
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()), | ||
router_(parent.config_), request_info_(Protocol::Http11), | ||
route_(parent_.cluster_.name(), timeout) { | ||
|
||
router_.setDecoderFilterCallbacks(*this); | ||
// TODO: Correctly set protocol in request info when we support access logging. | ||
} | ||
|
||
AsyncRequestImpl::~AsyncRequestImpl() { ASSERT(!reset_callback_); } | ||
AsyncStreamImpl::~AsyncStreamImpl() { ASSERT(!reset_callback_); } | ||
|
||
void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { | ||
response_.reset(new ResponseMessageImpl(std::move(headers))); | ||
void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattklein123 this is some odd nomenclature, why are headers received from the transport in a method called 'encode...' ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I explained this above, this is an internal implementation detail, the response is being "encoded" into the client, which is then dispatching the appropriate callbacks. |
||
#ifndef NDEBUG | ||
log_debug("async http request response headers (end_stream={}):", end_stream); | ||
response_->headers().iterate([](const HeaderEntry& header, void*) -> void { | ||
headers->iterate([](const HeaderEntry& header, void*) -> void { | ||
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str()); | ||
}, nullptr); | ||
#endif | ||
|
||
if (end_stream) { | ||
onComplete(); | ||
} | ||
stream_callbacks_.onHeaders(std::move(headers), end_stream); | ||
closeRemote(end_stream); | ||
} | ||
|
||
void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) { | ||
void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) { | ||
log_trace("async http request response data (length={} end_stream={})", data.length(), | ||
end_stream); | ||
if (!response_->body()) { | ||
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()}); | ||
} | ||
response_->body()->move(data); | ||
|
||
if (end_stream) { | ||
onComplete(); | ||
} | ||
stream_callbacks_.onData(data, end_stream); | ||
closeRemote(end_stream); | ||
} | ||
|
||
void AsyncRequestImpl::encodeTrailers(HeaderMapPtr&& trailers) { | ||
response_->trailers(std::move(trailers)); | ||
void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) { | ||
#ifndef NDEBUG | ||
log_debug("async http request response trailers:"); | ||
response_->trailers()->iterate([](const HeaderEntry& header, void*) -> void { | ||
trailers->iterate([](const HeaderEntry& header, void*) -> void { | ||
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str()); | ||
}, nullptr); | ||
#endif | ||
|
||
onComplete(); | ||
stream_callbacks_.onTrailers(std::move(trailers)); | ||
closeRemote(true); | ||
} | ||
|
||
void AsyncRequestImpl::cancel() { | ||
reset_callback_(); | ||
cleanup(); | ||
void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) { | ||
headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True); | ||
headers.insertForwardedFor().value(parent_.config_.local_info_.address()); | ||
router_.decodeHeaders(headers, end_stream); | ||
closeLocal(end_stream); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: new line in between |
||
|
||
void AsyncRequestImpl::onComplete() { | ||
complete_ = true; | ||
callbacks_.onSuccess(std::move(response_)); | ||
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { | ||
router_.decodeData(data, end_stream); | ||
decoding_buffer_ = &data; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not quite right and is kind of complicated. In short:
so basically for the streaming case I think decodingBuffer() should return nullptr, in this normal case, it should return the request body. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry just to further clarify, buffering should continue to work in the AsyncRequestImpl case (non-streaming). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
closeLocal(end_stream); | ||
} | ||
|
||
void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { | ||
router_.decodeTrailers(trailers); | ||
closeLocal(true); | ||
} | ||
|
||
void AsyncStreamImpl::closeLocal(bool end_stream) { | ||
local_closed_ |= end_stream; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. debug assertion if local already closed and end_stream = true There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
if (complete()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: braces around all if statements |
||
cleanup(); | ||
} | ||
|
||
void AsyncStreamImpl::closeRemote(bool end_stream) { | ||
remote_closed_ |= end_stream; | ||
if (complete()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: braces around all if statements |
||
cleanup(); | ||
} | ||
|
||
void AsyncStreamImpl::reset() { | ||
reset_callback_(); | ||
cleanup(); | ||
} | ||
|
||
void AsyncRequestImpl::cleanup() { | ||
response_.reset(); | ||
void AsyncStreamImpl::cleanup() { | ||
reset_callback_ = nullptr; | ||
|
||
// This will destroy us, but only do so if we are actually in a list. This does not happen in | ||
// the immediate failure case. | ||
if (inserted()) { | ||
removeFromList(parent_.active_requests_); | ||
removeFromList(parent_.active_streams_); | ||
} | ||
} | ||
|
||
void AsyncRequestImpl::resetStream() { | ||
// In this case we don't have a valid response so we do need to raise a failure. | ||
callbacks_.onFailure(AsyncClient::FailureReason::Reset); | ||
void AsyncStreamImpl::resetStream() { | ||
stream_callbacks_.onResetStream(); | ||
cleanup(); | ||
} | ||
|
||
void AsyncRequestImpl::failDueToClientDestroy() { | ||
void AsyncStreamImpl::failDueToClientDestroy() { | ||
// In this case we are going away because the client is being destroyed. We need to both reset | ||
// the stream as well as raise a failure callback. | ||
reset_callback_(); | ||
resetStream(); | ||
} | ||
|
||
AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, | ||
AsyncClient::Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) | ||
: AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) { | ||
|
||
sendHeaders(request_->headers(), !request_->body()); | ||
if (!complete() && request_->body()) { | ||
sendData(*request_->body(), true); | ||
} | ||
// TODO: Support request trailers. | ||
} | ||
|
||
void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); } | ||
|
||
void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) { | ||
response_.reset(new ResponseMessageImpl(std::move(headers))); | ||
|
||
if (end_stream) { | ||
onComplete(); | ||
} | ||
} | ||
|
||
void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { | ||
if (!response_->body()) { | ||
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()}); | ||
} | ||
response_->body()->move(data); | ||
|
||
if (end_stream) { | ||
onComplete(); | ||
} | ||
} | ||
|
||
void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) { | ||
response_->trailers(std::move(trailers)); | ||
onComplete(); | ||
} | ||
|
||
void AsyncRequestImpl::onResetStream() { | ||
// In this case we don't have a valid response so we do need to raise a failure. | ||
callbacks_.onFailure(AsyncClient::FailureReason::Reset); | ||
cleanup(); | ||
} | ||
|
||
void AsyncRequestImpl::cancel() { reset(); } | ||
|
||
} // Http |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth having some exposition about the full-duplex nature of streams in the documentation. For instance StreamCallbacks can continue to receive events even if the Stream has had sendXXX(..., end_stream=true) called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done