-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend AsyncClient to support streaming requests / responses #353
Changes from all commits
ebbb140
7266341
a1d2b14
373ba1f
7c7ab57
803f824
f157b49
0a491f7
410e361
edbcb91
6b5b227
46ac4ca
663c6fc
afd4b0d
05538d4
944bde5
a1c75c7
ab38439
580edff
5a1ccc6
6bc5ef5
a321140
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,45 @@ class AsyncClient { | |
}; | ||
|
||
/** | ||
* An in-flight HTTP request | ||
* Notifies caller of async HTTP stream status. | ||
* Note the HTTP stream is full-duplex, even if the local to remote stream has been ended | ||
* by Stream.sendHeaders/sendData with end_stream=true or sendTrailers, | ||
* StreamCallbacks can continue to receive events until the remote to local stream is closed, | ||
* and vice versa. | ||
*/ | ||
class StreamCallbacks { | ||
public: | ||
virtual ~StreamCallbacks() {} | ||
|
||
/** | ||
* Called when all headers get received on the async HTTP stream. | ||
* @param headers the headers received | ||
* @param end_stream whether the response is header only | ||
*/ | ||
virtual void onHeaders(HeaderMapPtr&& headers, bool end_stream) PURE; | ||
|
||
/** | ||
* Called when a data frame get received on the async HTTP stream. | ||
* This can be invoked multiple times if the data get streamed. | ||
* @param data the data received | ||
* @param end_stream whether the data is the last data frame | ||
*/ | ||
virtual void onData(Buffer::Instance& data, bool end_stream) PURE; | ||
|
||
/** | ||
* Called when all trailers get received on the async HTTP stream. | ||
* @param trailers the trailers received. | ||
*/ | ||
virtual void onTrailers(HeaderMapPtr&& trailers) PURE; | ||
|
||
/** | ||
* Called when the async HTTP stream is reset. | ||
*/ | ||
virtual void onReset() PURE; | ||
}; | ||
|
||
/** | ||
* An in-flight HTTP request. | ||
*/ | ||
class Request { | ||
public: | ||
|
@@ -50,18 +88,66 @@ class AsyncClient { | |
virtual void cancel() PURE; | ||
}; | ||
|
||
/** | ||
* An in-flight HTTP stream. | ||
*/ | ||
class Stream { | ||
public: | ||
virtual ~Stream() {} | ||
|
||
/*** | ||
* Send headers to the stream. This method cannot be invoked more than once and | ||
* need to be called before sendData. | ||
* @param headers supplies the headers to send. | ||
* @param end_stream supplies whether this is a header only request. | ||
*/ | ||
virtual void sendHeaders(HeaderMap& headers, bool end_stream) PURE; | ||
|
||
/*** | ||
* Send data to the stream. This method can be invoked multiple times if it get streamed. | ||
* To end the stream without data, call this method with empty buffer. | ||
* @param data supplies the data to send. | ||
* @param end_stream supplies whether this is the last data. | ||
*/ | ||
virtual void sendData(Buffer::Instance& data, bool end_stream) PURE; | ||
|
||
/*** | ||
* Send trailers. This method cannot be invoked more than once, and implicitly ends the stream. | ||
* @param trailers supplies the trailers to send. | ||
*/ | ||
virtual void sendTrailers(HeaderMap& trailers) PURE; | ||
|
||
/*** | ||
* Reset the stream. | ||
*/ | ||
virtual void reset() PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this call trigger StreamCallbacks.onResetStream or does that only occur if the RESET is received from the remote side. If we're thread local then it seems like it should call. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does call, but this is an internal implementation detail of the async client / request, so I don't think we need to expose from this interface. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StreamCallbacks.onReset only occur if the RESET is received from the remote. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now reset() will call StreamCallbacks.onReset |
||
}; | ||
|
||
virtual ~AsyncClient() {} | ||
|
||
/** | ||
* Send an HTTP request asynchronously | ||
* @param request the request to send. | ||
* @param callbacks the callbacks to be notified of request status. | ||
* @param timeout supplies the request timeout | ||
* @return a request handle or nullptr if no request could be created. NOTE: In this case | ||
* onFailure() has already been called inline. The client owns the request and the | ||
* handle should just be used to cancel. | ||
*/ | ||
virtual Request* send(MessagePtr&& request, Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) PURE; | ||
|
||
/** | ||
* Start an HTTP stream asynchronously. | ||
* @param callbacks the callbacks to be notified of stream status. | ||
* @param timeout supplies the stream timeout, measured since when the frame with end_stream | ||
* flag is sent until when the first frame is received. | ||
* @return a stream handle or nullptr if no stream could be started. NOTE: In this case | ||
* onResetStream() has already been called inline. The client owns the stream and | ||
* the handle can be used to send more messages or close the stream. | ||
*/ | ||
virtual Stream* start(StreamCallbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's questionable whether timeout makes sense on this interface, since the only reason I could see it really ever being used is for long lived streaming API requests like you are implementing it for. But I don't have a strong opinion and could go either way. I guess since it's optional, we can leave it. (Please update the comments though). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have a strong opinion either. I will update comments soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explicitly says if no timeout given, it's infinite. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is behavior if timeout is hit, does it implicitly call Stream.reset() and by implication the Callback.onResetStream gets called & on what thread? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is all single threaded. Yes, when timeout is hit the stream will be reset with callbacks called. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When timeout, it will call StreamCallbacks.onHeader with :status 504 and onData, the header and data is coming from underlying filters. See this test: |
||
}; | ||
|
||
typedef std::unique_ptr<AsyncClient> AsyncClientPtr; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,14 @@ | ||
#include "async_client_impl.h" | ||
#include "headers.h" | ||
|
||
namespace Http { | ||
|
||
const std::vector<std::reference_wrapper<const Router::RateLimitPolicyEntry>> | ||
AsyncRequestImpl::NullRateLimitPolicy::rate_limit_policy_entry_; | ||
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::RouteEntryImpl::rate_limit_policy_; | ||
const AsyncRequestImpl::NullRetryPolicy AsyncRequestImpl::RouteEntryImpl::retry_policy_; | ||
const AsyncRequestImpl::NullShadowPolicy AsyncRequestImpl::RouteEntryImpl::shadow_policy_; | ||
const AsyncRequestImpl::NullVirtualHost AsyncRequestImpl::RouteEntryImpl::virtual_host_; | ||
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::NullVirtualHost::rate_limit_policy_; | ||
AsyncStreamImpl::NullRateLimitPolicy::rate_limit_policy_entry_; | ||
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::RouteEntryImpl::rate_limit_policy_; | ||
const AsyncStreamImpl::NullRetryPolicy AsyncStreamImpl::RouteEntryImpl::retry_policy_; | ||
const AsyncStreamImpl::NullShadowPolicy AsyncStreamImpl::RouteEntryImpl::shadow_policy_; | ||
const AsyncStreamImpl::NullVirtualHost AsyncStreamImpl::RouteEntryImpl::virtual_host_; | ||
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::NullVirtualHost::rate_limit_policy_; | ||
|
||
AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store, | ||
Event::Dispatcher& dispatcher, | ||
|
@@ -22,120 +21,183 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St | |
dispatcher_(dispatcher) {} | ||
|
||
AsyncClientImpl::~AsyncClientImpl() { | ||
while (!active_requests_.empty()) { | ||
active_requests_.front()->failDueToClientDestroy(); | ||
while (!active_streams_.empty()) { | ||
active_streams_.front()->reset(); | ||
} | ||
} | ||
|
||
AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) { | ||
std::unique_ptr<AsyncRequestImpl> new_request{ | ||
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)}; | ||
AsyncRequestImpl* async_request = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: just initialize this directly in unique_ptr and return front().get() like you do below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having a raw pointer here is to avoid downcasting from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok sorry sounds good |
||
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout); | ||
std::unique_ptr<AsyncStreamImpl> new_request{async_request}; | ||
|
||
// The request may get immediately failed. If so, we will return nullptr. | ||
if (!new_request->complete_) { | ||
new_request->moveIntoList(std::move(new_request), active_requests_); | ||
return active_requests_.front().get(); | ||
if (!new_request->remote_closed_) { | ||
new_request->moveIntoList(std::move(new_request), active_streams_); | ||
return async_request; | ||
} else { | ||
return nullptr; | ||
} | ||
} | ||
|
||
AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, | ||
AsyncClient::Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) | ||
: request_(std::move(request)), parent_(parent), callbacks_(callbacks), | ||
stream_id_(parent.config_.random_.random()), router_(parent.config_), | ||
request_info_(Protocol::Http11), route_(parent_.cluster_.name(), timeout) { | ||
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) { | ||
std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, timeout)}; | ||
|
||
router_.setDecoderFilterCallbacks(*this); | ||
request_->headers().insertEnvoyInternalRequest().value( | ||
Headers::get().EnvoyInternalRequestValues.True); | ||
request_->headers().insertForwardedFor().value(parent_.config_.local_info_.address()); | ||
router_.decodeHeaders(request_->headers(), !request_->body()); | ||
if (!complete_ && request_->body()) { | ||
router_.decodeData(*request_->body(), true); | ||
// The request may get immediately failed. If so, we will return nullptr. | ||
if (!new_stream->remote_closed_) { | ||
new_stream->moveIntoList(std::move(new_stream), active_streams_); | ||
return active_streams_.front().get(); | ||
} else { | ||
return nullptr; | ||
} | ||
} | ||
|
||
// TODO: Support request trailers. | ||
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) | ||
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()), | ||
router_(parent.config_), request_info_(Protocol::Http11), | ||
route_(parent_.cluster_.name(), timeout) { | ||
|
||
router_.setDecoderFilterCallbacks(*this); | ||
// TODO: Correctly set protocol in request info when we support access logging. | ||
} | ||
|
||
AsyncRequestImpl::~AsyncRequestImpl() { ASSERT(!reset_callback_); } | ||
AsyncStreamImpl::~AsyncStreamImpl() { ASSERT(!reset_callback_); } | ||
|
||
void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { | ||
response_.reset(new ResponseMessageImpl(std::move(headers))); | ||
void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattklein123 this is some odd nomenclature, why are headers received from the transport in a method called 'encode...' ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I explained this above, this is an internal implementation detail, the response is being "encoded" into the client, which is then dispatching the appropriate callbacks. |
||
#ifndef NDEBUG | ||
log_debug("async http request response headers (end_stream={}):", end_stream); | ||
response_->headers().iterate([](const HeaderEntry& header, void*) -> void { | ||
headers->iterate([](const HeaderEntry& header, void*) -> void { | ||
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str()); | ||
}, nullptr); | ||
#endif | ||
|
||
if (end_stream) { | ||
onComplete(); | ||
} | ||
stream_callbacks_.onHeaders(std::move(headers), end_stream); | ||
closeRemote(end_stream); | ||
} | ||
|
||
void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) { | ||
void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) { | ||
log_trace("async http request response data (length={} end_stream={})", data.length(), | ||
end_stream); | ||
if (!response_->body()) { | ||
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()}); | ||
} | ||
response_->body()->move(data); | ||
|
||
if (end_stream) { | ||
onComplete(); | ||
} | ||
stream_callbacks_.onData(data, end_stream); | ||
closeRemote(end_stream); | ||
} | ||
|
||
void AsyncRequestImpl::encodeTrailers(HeaderMapPtr&& trailers) { | ||
response_->trailers(std::move(trailers)); | ||
void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) { | ||
#ifndef NDEBUG | ||
log_debug("async http request response trailers:"); | ||
response_->trailers()->iterate([](const HeaderEntry& header, void*) -> void { | ||
trailers->iterate([](const HeaderEntry& header, void*) -> void { | ||
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str()); | ||
}, nullptr); | ||
#endif | ||
|
||
onComplete(); | ||
stream_callbacks_.onTrailers(std::move(trailers)); | ||
closeRemote(true); | ||
} | ||
|
||
void AsyncRequestImpl::cancel() { | ||
reset_callback_(); | ||
cleanup(); | ||
void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) { | ||
headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True); | ||
headers.insertForwardedFor().value(parent_.config_.local_info_.address()); | ||
router_.decodeHeaders(headers, end_stream); | ||
closeLocal(end_stream); | ||
} | ||
|
||
void AsyncRequestImpl::onComplete() { | ||
complete_ = true; | ||
callbacks_.onSuccess(std::move(response_)); | ||
cleanup(); | ||
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { | ||
router_.decodeData(data, end_stream); | ||
closeLocal(end_stream); | ||
} | ||
|
||
void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { | ||
router_.decodeTrailers(trailers); | ||
closeLocal(true); | ||
} | ||
|
||
void AsyncStreamImpl::closeLocal(bool end_stream) { | ||
ASSERT(!(local_closed_ && end_stream)); | ||
|
||
local_closed_ |= end_stream; | ||
if (complete()) { | ||
cleanup(); | ||
} | ||
} | ||
|
||
void AsyncStreamImpl::closeRemote(bool end_stream) { | ||
remote_closed_ |= end_stream; | ||
if (complete()) { | ||
cleanup(); | ||
} | ||
} | ||
|
||
void AsyncRequestImpl::cleanup() { | ||
response_.reset(); | ||
void AsyncStreamImpl::reset() { | ||
reset_callback_(); | ||
resetStream(); | ||
} | ||
|
||
void AsyncStreamImpl::cleanup() { | ||
reset_callback_ = nullptr; | ||
|
||
// This will destroy us, but only do so if we are actually in a list. This does not happen in | ||
// the immediate failure case. | ||
if (inserted()) { | ||
removeFromList(parent_.active_requests_); | ||
removeFromList(parent_.active_streams_); | ||
} | ||
} | ||
|
||
void AsyncRequestImpl::resetStream() { | ||
// In this case we don't have a valid response so we do need to raise a failure. | ||
callbacks_.onFailure(AsyncClient::FailureReason::Reset); | ||
void AsyncStreamImpl::resetStream() { | ||
stream_callbacks_.onReset(); | ||
cleanup(); | ||
} | ||
|
||
void AsyncRequestImpl::failDueToClientDestroy() { | ||
// In this case we are going away because the client is being destroyed. We need to both reset | ||
// the stream as well as raise a failure callback. | ||
reset_callback_(); | ||
callbacks_.onFailure(AsyncClient::FailureReason::Reset); | ||
cleanup(); | ||
AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, | ||
AsyncClient::Callbacks& callbacks, | ||
const Optional<std::chrono::milliseconds>& timeout) | ||
: AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) { | ||
|
||
sendHeaders(request_->headers(), !request_->body()); | ||
if (!complete() && request_->body()) { | ||
sendData(*request_->body(), true); | ||
} | ||
// TODO: Support request trailers. | ||
} | ||
|
||
void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); } | ||
|
||
void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) { | ||
response_.reset(new ResponseMessageImpl(std::move(headers))); | ||
|
||
if (end_stream) { | ||
onComplete(); | ||
} | ||
} | ||
|
||
void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { | ||
if (!response_->body()) { | ||
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()}); | ||
} | ||
response_->body()->move(data); | ||
|
||
if (end_stream) { | ||
onComplete(); | ||
} | ||
} | ||
|
||
void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) { | ||
response_->trailers(std::move(trailers)); | ||
onComplete(); | ||
} | ||
|
||
void AsyncRequestImpl::onReset() { | ||
if (!cancelled_) { | ||
// In this case we don't have a valid response so we do need to raise a failure. | ||
callbacks_.onFailure(AsyncClient::FailureReason::Reset); | ||
} | ||
} | ||
|
||
void AsyncRequestImpl::cancel() { | ||
cancelled_ = true; | ||
reset(); | ||
} | ||
|
||
} // Http |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth having some exposition about the full-duplex nature of streams in the documentation. For instance StreamCallbacks can continue to receive events even if the Stream has had sendXXX(..., end_stream=true) called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done