Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend AsyncClient to support streaming requests / responses #353

Merged
merged 22 commits into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,39 @@ class AsyncClient {
virtual void onFailure(FailureReason reason) PURE;
};

/**
* Notifies caller of async HTTP stream status.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth having some exposition about the full-duplex nature of streams in the documentation. For instance StreamCallbacks can continue to receive events even if the Stream has had sendXXX(..., end_stream=true) called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
class StreamCallbacks {
public:
virtual ~StreamCallbacks() {}

/**
* Called when the async HTTP stream receives headers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When all headers get received.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param headers the headers received
* @param end_stream whether the response is header only
*/
virtual void onHeaders(HeaderMapPtr&& headers, bool end_stream) PURE;

/**
* Called when the async HTTP stream receives data.
* @param data the data received
* @param end_stream whether the data is the last data frame
*/
virtual void onData(Buffer::Instance& data, bool end_stream) PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be invoked multiple times if the data get streamed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link

@louiscryan louiscryan Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is stream closure signalled even if no data was delivered? Seems cleaner to separate into a separate callback otherwise closure handling will end up being overloaded into each handler impl

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me the StreamDecoderFilter is an interface which already provide all the functionality. The Stream interface looks like an unnecessary wrapper on top of it. Use the StreamDecoderFilter directly by get the router (or a wrapper of router with active count, etc) is more straightforward.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mostly agree about StreamDecoderFilter but there are many aspects of StreamDecoderFilterCallbacks that people are unlikely to need or want.

The fact that an explicit 'reset' handler must be installed

https://github.com/lyft/envoy/blob/89de442d82a1727aa7f53604748e953cc27b4849/include/envoy/http/filter.h#L69

does make the API less obvious than this one.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... the more I look at this the less convinced I am. The StreamXXXFilter methods are more like the ones I would expect a client to implement and the StreamXXXFilterCallbacks look more like the write interface.

@mattklein123 can probably shed some like on the relationships and why things are implemented this way.

For pure client use-cases I would expect an API more like the one @lizan wrote.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the simple interface that @lizan wrote is the one to use. Here is why this all works this way: The way AsyncClient is implemented is that it basically mimics ActiveStream inside the HTTP connection manager, and instantiates a router filter to do the work. On one hand, this is kind of hack, on the other hand, it's an implementation detail so it doesn't really matter. So, in the end of the data, internally, the async client implements the StreamXXXCallback methods, which is the write interface, and then translates those "writes" into async client request/stream callbacks, which is this interface.


/**
* Called when the async HTTP stream receives trailers.
* @param trailers the trailers received.
*/
virtual void onTrailers(HeaderMapPtr&& trailers) PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When all trailers get received.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* Called when the async HTTP stream is reset.
*/
virtual void onResetStream() PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onReset to matches with the reset() method in Stream interface. Or change it to resetStream().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to onReset

};

/**
* An in-flight HTTP request
*/
Expand All @@ -50,6 +83,39 @@ class AsyncClient {
virtual void cancel() PURE;
};

/**
* An in-flight HTTP stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: full stop here and down below for comments on methods.

*/
class Stream {
public:
virtual ~Stream() {}

/***
* Send headers to the stream
* @param headers supplies the headers to send
* @param end_stream supplies whether this is a header only request
*/
virtual void sendHeaders(HeaderMap& headers, bool end_stream) PURE;

/***
* Send data to the stream
* @param data supplies the data to send
* @param end_stream supplies whether this is the last data
*/
virtual void sendData(Buffer::Instance& data, bool end_stream) PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly says the sendData can be invoked multiple times if it get streamed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above about closing without data. I don't mind collapsing data write with closure here but then this API needs to document how to do that with no data, e.g. using an empty or null buffer?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... also what is behavior if a call is made after end_stream has already occurred?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's programming error. Looks like an error code should be returned.
Without changing the method signature, it should be ignored with an error log?

Copy link

@louiscryan louiscryan Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need code to distinguish between programming error when double local close/reset and transport failure (local close after remote reset.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things:

  1. The codec APIs generally follow the http/2 framing format. Since in that API, if you want to close the stream after headers are complete, but without data, you must send an empty data frame along w/ the end stream bit, this is what we do also, for better or worse. Feel free to update the docs to account for this.
  2. There are various asserts in different places to account for programmer error, but the main problem with this is due to object lifetime. Since streams may get destroyed right away, you are not guaranteed to actually be able to check vs. just be dealing with memory corruption / crash situation, so in general we just have a contract that says that once certain events happen you can't use the object anymore. I've tried to be very clear about this in codec.h and filter.h, but let me know if it's not clear enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, documented that should use sendData with empty buffer to half close the stream.


/***
* Send trailers. This implicitly ends the stream.
* @param trailers supplies the trailers to send
*/
virtual void sendTrailers(HeaderMap& trailers) PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as sendHeaders, sendTrailer cannot be invoked more than once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/***
* Close the stream.
*/
virtual void close() PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do exactly? The above methods implicitly send end_stream. Is this a reset? If so can we call it reset() and provide a reset reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is comparable to Request::cancel() and its implementation are almost resetting stream and cleanup. I can rename it to reset().

The underlying Filter (i.e. router_)'s interface doesn't have reset reason, so I didn't provide that here. Do you think it is needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry you are right, I just looked at the code again. The fact that there is no reason is due to how we are wrapping the router filter to do all of this. I would just rename to reset() for clarity since IMO that makes more sense in the streaming case.

};

virtual ~AsyncClient() {}

/**
Expand All @@ -62,6 +128,16 @@ class AsyncClient {
*/
virtual Request* send(MessagePtr&& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;

/**
* Start an HTTP stream asynchronously
* @param callbacks the callbacks to be notified of stream status.
* @return a stream handle or nullptr if no stream could be started. NOTE: In this case
* onResetStream() has already been called inline. The client owns the stream and
* the handle can be used to send more messages or close the stream.
*/
virtual Stream* start(StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's questionable whether timeout makes sense on this interface, since the only reason I could see it really ever being used is for long lived streaming API requests like you are implementing it for. But I don't have a strong opinion and could go either way. I guess since it's optional, we can leave it. (Please update the comments though).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion either. I will update comments soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly says if no timeout given, it's infinite.

Copy link

@louiscryan louiscryan Jan 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is behavior if timeout is hit, does it implicitly call Stream.reset() and by implication the Callback.onResetStream gets called & on what thread?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all single threaded. Yes, when timeout is hit the stream will be reset with callbacks called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When timeout, it will call StreamCallbacks.onHeader with :status 504 and onData, the header and data is coming from underlying filters.

See this test:
https://github.com/lyft/envoy/pull/353/files#diff-918cf37446b66e452d1114b88b1e61c4R578

};

typedef std::unique_ptr<AsyncClient> AsyncClientPtr;
Expand Down
162 changes: 105 additions & 57 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#include "async_client_impl.h"
#include "headers.h"

namespace Http {

const std::vector<std::reference_wrapper<const Router::RateLimitPolicyEntry>>
AsyncRequestImpl::NullRateLimitPolicy::rate_limit_policy_entry_;
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::RouteEntryImpl::rate_limit_policy_;
const AsyncRequestImpl::NullRetryPolicy AsyncRequestImpl::RouteEntryImpl::retry_policy_;
const AsyncRequestImpl::NullShadowPolicy AsyncRequestImpl::RouteEntryImpl::shadow_policy_;
const AsyncRequestImpl::NullVirtualHost AsyncRequestImpl::RouteEntryImpl::virtual_host_;
const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::NullVirtualHost::rate_limit_policy_;
AsyncStreamImpl::NullRateLimitPolicy::rate_limit_policy_entry_;
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::RouteEntryImpl::rate_limit_policy_;
const AsyncStreamImpl::NullRetryPolicy AsyncStreamImpl::RouteEntryImpl::retry_policy_;
const AsyncStreamImpl::NullShadowPolicy AsyncStreamImpl::RouteEntryImpl::shadow_policy_;
const AsyncStreamImpl::NullVirtualHost AsyncStreamImpl::RouteEntryImpl::virtual_host_;
const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::NullVirtualHost::rate_limit_policy_;

AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher,
Expand All @@ -29,92 +28,92 @@ AsyncClientImpl::~AsyncClientImpl() {

AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncRequestImpl> new_request{
new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)};
AsyncRequestImpl* async_request =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just initialize this directly in unique_ptr and return front().get() like you do below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a raw pointer here is to avoid downcasting from AsyncStreamImpl to AsyncRequestImpl when it returns. AsyncStreamImpl is not a subclass of AsyncClient::Request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sorry sounds good

new AsyncRequestImpl(std::move(request), *this, callbacks, timeout);
std::unique_ptr<AsyncStreamImpl> new_request{async_request};

// The request may get immediately failed. If so, we will return nullptr.
if (!new_request->complete_) {
new_request->moveIntoList(std::move(new_request), active_requests_);
return active_requests_.front().get();
return async_request;
} else {
return nullptr;
}
}

AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: request_(std::move(request)), parent_(parent), callbacks_(callbacks),
stream_id_(parent.config_.random_.random()), router_(parent.config_),
request_info_(Protocol::Http11), route_(parent_.cluster_.name(), timeout) {
AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
AsyncStreamImpl* async_request = new AsyncStreamImpl(*this, callbacks, timeout);
std::unique_ptr<AsyncStreamImpl> new_request{async_request};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: combine into one line
std::unique_ptr new_request(new AsyncStreamImpl(*this, callbacks, timeout));


router_.setDecoderFilterCallbacks(*this);
request_->headers().insertEnvoyInternalRequest().value(
Headers::get().EnvoyInternalRequestValues.True);
request_->headers().insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(request_->headers(), !request_->body());
if (!complete_ && request_->body()) {
router_.decodeData(*request_->body(), true);
// The request may get immediately failed. If so, we will return nullptr.
if (!new_request->complete_) {
new_request->moveIntoList(std::move(new_request), active_requests_);
return async_request;
} else {
return nullptr;
}
}

// TODO: Support request trailers.
AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
router_(parent.config_), request_info_(Protocol::Http11),
route_(parent_.cluster_.name(), timeout) {

router_.setDecoderFilterCallbacks(*this);
// TODO: Correctly set protocol in request info when we support access logging.
}

AsyncRequestImpl::~AsyncRequestImpl() { ASSERT(!reset_callback_); }
AsyncStreamImpl::~AsyncStreamImpl() { ASSERT(!reset_callback_); }

void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));
void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 this is some odd nomenclature, why are headers received from the transport in a method called 'encode...' ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained this above, this is an internal implementation detail, the response is being "encoded" into the client, which is then dispatching the appropriate callbacks.

#ifndef NDEBUG
log_debug("async http request response headers (end_stream={}):", end_stream);
response_->headers().iterate([](const HeaderEntry& header, void*) -> void {
headers->iterate([](const HeaderEntry& header, void*) -> void {
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str());
}, nullptr);
#endif

if (end_stream) {
onComplete();
}
stream_callbacks_.onHeaders(std::move(headers), end_stream);
}

void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) {
void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
log_trace("async http request response data (length={} end_stream={})", data.length(),
end_stream);
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
stream_callbacks_.onData(data, end_stream);
}

void AsyncRequestImpl::encodeTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) {
#ifndef NDEBUG
log_debug("async http request response trailers:");
response_->trailers()->iterate([](const HeaderEntry& header, void*) -> void {
trailers->iterate([](const HeaderEntry& header, void*) -> void {
log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str());
}, nullptr);
#endif

onComplete();
stream_callbacks_.onTrailers(std::move(trailers));
}

void AsyncRequestImpl::cancel() {
reset_callback_();
cleanup();
void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) {
headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True);
headers.insertForwardedFor().value(parent_.config_.local_info_.address());
router_.decodeHeaders(headers, end_stream);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line in between

void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
router_.decodeData(data, end_stream);
decoding_buffer_ = &data;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not quite right and is kind of complicated. In short:

  1. When you call router_.decodeData(...) it will return a response code which we currently ignore which tells you whether to buffer or not. Buffering is only done in certain cases, for example, if we might retry, or shadow. In this case, the "connection manager" (AsyncClient) is expected to buffer the entire request.
  2. In the streaming case, it doesn't make sense to buffer. Therefore, I would probably check the return code of router_.decodeData(...) and if it asks for buffering, throw an exception of some kind to indicate programmer error.

so basically for the streaming case I think decodingBuffer() should return nullptr, in this normal case, it should return the request body.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry just to further clarify, buffering should continue to work in the AsyncRequestImpl case (non-streaming).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

void AsyncRequestImpl::onComplete() {
complete_ = true;
callbacks_.onSuccess(std::move(response_));
void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { router_.decodeTrailers(trailers); }

void AsyncStreamImpl::close() {
reset_callback_();
cleanup();
}

void AsyncRequestImpl::cleanup() {
response_.reset();
void AsyncStreamImpl::cleanup() {
// response_.reset();
reset_callback_ = nullptr;

// This will destroy us, but only do so if we are actually in a list. This does not happen in
Expand All @@ -124,18 +123,67 @@ void AsyncRequestImpl::cleanup() {
}
}

void AsyncRequestImpl::resetStream() {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
void AsyncStreamImpl::resetStream() {
stream_callbacks_.onResetStream();
cleanup();
}

void AsyncRequestImpl::failDueToClientDestroy() {
void AsyncStreamImpl::failDueToClientDestroy() {
// In this case we are going away because the client is being destroyed. We need to both reset
// the stream as well as raise a failure callback.
reset_callback_();
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
resetStream();
}

AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout)
: AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) {

sendHeaders(request_->headers(), !request_->body());
if (!complete_ && request_->body()) {
sendData(*request_->body(), true);
}
// TODO: Support request trailers.
}

AsyncRequestImpl::~AsyncRequestImpl() {}

void AsyncRequestImpl::onComplete() {
complete_ = true;
callbacks_.onSuccess(std::move(response_));
cleanup();
}

void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
response_.reset(new ResponseMessageImpl(std::move(headers)));

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) {
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
onComplete();
}

void AsyncRequestImpl::onResetStream() {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
}

void AsyncRequestImpl::cancel() { close(); }

} // Http
Loading