diff --git a/include/envoy/http/async_client.h b/include/envoy/http/async_client.h index 42742f0896ea..964301f09568 100644 --- a/include/envoy/http/async_client.h +++ b/include/envoy/http/async_client.h @@ -38,7 +38,45 @@ class AsyncClient { }; /** - * An in-flight HTTP request + * Notifies caller of async HTTP stream status. + * Note the HTTP stream is full-duplex, even if the local to remote stream has been ended + * by Stream.sendHeaders/sendData with end_stream=true or sendTrailers, + * StreamCallbacks can continue to receive events until the remote to local stream is closed, + * and vice versa. + */ + class StreamCallbacks { + public: + virtual ~StreamCallbacks() {} + + /** + * Called when all headers get received on the async HTTP stream. + * @param headers the headers received + * @param end_stream whether the response is header only + */ + virtual void onHeaders(HeaderMapPtr&& headers, bool end_stream) PURE; + + /** + * Called when a data frame get received on the async HTTP stream. + * This can be invoked multiple times if the data get streamed. + * @param data the data received + * @param end_stream whether the data is the last data frame + */ + virtual void onData(Buffer::Instance& data, bool end_stream) PURE; + + /** + * Called when all trailers get received on the async HTTP stream. + * @param trailers the trailers received. + */ + virtual void onTrailers(HeaderMapPtr&& trailers) PURE; + + /** + * Called when the async HTTP stream is reset. + */ + virtual void onReset() PURE; + }; + + /** + * An in-flight HTTP request. */ class Request { public: @@ -50,18 +88,66 @@ class AsyncClient { virtual void cancel() PURE; }; + /** + * An in-flight HTTP stream. + */ + class Stream { + public: + virtual ~Stream() {} + + /*** + * Send headers to the stream. This method cannot be invoked more than once and + * need to be called before sendData. + * @param headers supplies the headers to send. + * @param end_stream supplies whether this is a header only request. + */ + virtual void sendHeaders(HeaderMap& headers, bool end_stream) PURE; + + /*** + * Send data to the stream. This method can be invoked multiple times if it get streamed. + * To end the stream without data, call this method with empty buffer. + * @param data supplies the data to send. + * @param end_stream supplies whether this is the last data. + */ + virtual void sendData(Buffer::Instance& data, bool end_stream) PURE; + + /*** + * Send trailers. This method cannot be invoked more than once, and implicitly ends the stream. + * @param trailers supplies the trailers to send. + */ + virtual void sendTrailers(HeaderMap& trailers) PURE; + + /*** + * Reset the stream. + */ + virtual void reset() PURE; + }; + virtual ~AsyncClient() {} /** * Send an HTTP request asynchronously * @param request the request to send. * @param callbacks the callbacks to be notified of request status. + * @param timeout supplies the request timeout * @return a request handle or nullptr if no request could be created. NOTE: In this case * onFailure() has already been called inline. The client owns the request and the * handle should just be used to cancel. */ virtual Request* send(MessagePtr&& request, Callbacks& callbacks, const Optional& timeout) PURE; + + /** + * Start an HTTP stream asynchronously. + * @param callbacks the callbacks to be notified of stream status. + * @param timeout supplies the stream timeout, measured since when the frame with end_stream + * flag is sent until when the first frame is received. + * @return a stream handle or nullptr if no stream could be started. NOTE: In this case + * onResetStream() has already been called inline. The client owns the stream and + * the handle can be used to send more messages or close the stream. + */ + virtual Stream* start(StreamCallbacks& callbacks, + const Optional& timeout) PURE; }; typedef std::unique_ptr AsyncClientPtr; diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index ffbdf370c4bf..2aae77c040d9 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -1,15 +1,14 @@ #include "async_client_impl.h" -#include "headers.h" namespace Http { const std::vector> - AsyncRequestImpl::NullRateLimitPolicy::rate_limit_policy_entry_; -const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::RouteEntryImpl::rate_limit_policy_; -const AsyncRequestImpl::NullRetryPolicy AsyncRequestImpl::RouteEntryImpl::retry_policy_; -const AsyncRequestImpl::NullShadowPolicy AsyncRequestImpl::RouteEntryImpl::shadow_policy_; -const AsyncRequestImpl::NullVirtualHost AsyncRequestImpl::RouteEntryImpl::virtual_host_; -const AsyncRequestImpl::NullRateLimitPolicy AsyncRequestImpl::NullVirtualHost::rate_limit_policy_; + AsyncStreamImpl::NullRateLimitPolicy::rate_limit_policy_entry_; +const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::RouteEntryImpl::rate_limit_policy_; +const AsyncStreamImpl::NullRetryPolicy AsyncStreamImpl::RouteEntryImpl::retry_policy_; +const AsyncStreamImpl::NullShadowPolicy AsyncStreamImpl::RouteEntryImpl::shadow_policy_; +const AsyncStreamImpl::NullVirtualHost AsyncStreamImpl::RouteEntryImpl::virtual_host_; +const AsyncStreamImpl::NullRateLimitPolicy AsyncStreamImpl::NullVirtualHost::rate_limit_policy_; AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store, Event::Dispatcher& dispatcher, @@ -22,120 +21,183 @@ AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::St dispatcher_(dispatcher) {} AsyncClientImpl::~AsyncClientImpl() { - while (!active_requests_.empty()) { - active_requests_.front()->failDueToClientDestroy(); + while (!active_streams_.empty()) { + active_streams_.front()->reset(); } } AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks, const Optional& timeout) { - std::unique_ptr new_request{ - new AsyncRequestImpl(std::move(request), *this, callbacks, timeout)}; + AsyncRequestImpl* async_request = + new AsyncRequestImpl(std::move(request), *this, callbacks, timeout); + std::unique_ptr new_request{async_request}; // The request may get immediately failed. If so, we will return nullptr. - if (!new_request->complete_) { - new_request->moveIntoList(std::move(new_request), active_requests_); - return active_requests_.front().get(); + if (!new_request->remote_closed_) { + new_request->moveIntoList(std::move(new_request), active_streams_); + return async_request; } else { return nullptr; } } -AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, - AsyncClient::Callbacks& callbacks, - const Optional& timeout) - : request_(std::move(request)), parent_(parent), callbacks_(callbacks), - stream_id_(parent.config_.random_.random()), router_(parent.config_), - request_info_(Protocol::Http11), route_(parent_.cluster_.name(), timeout) { +AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout) { + std::unique_ptr new_stream{new AsyncStreamImpl(*this, callbacks, timeout)}; - router_.setDecoderFilterCallbacks(*this); - request_->headers().insertEnvoyInternalRequest().value( - Headers::get().EnvoyInternalRequestValues.True); - request_->headers().insertForwardedFor().value(parent_.config_.local_info_.address()); - router_.decodeHeaders(request_->headers(), !request_->body()); - if (!complete_ && request_->body()) { - router_.decodeData(*request_->body(), true); + // The request may get immediately failed. If so, we will return nullptr. + if (!new_stream->remote_closed_) { + new_stream->moveIntoList(std::move(new_stream), active_streams_); + return active_streams_.front().get(); + } else { + return nullptr; } +} - // TODO: Support request trailers. +AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout) + : parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()), + router_(parent.config_), request_info_(Protocol::Http11), + route_(parent_.cluster_.name(), timeout) { + + router_.setDecoderFilterCallbacks(*this); // TODO: Correctly set protocol in request info when we support access logging. } -AsyncRequestImpl::~AsyncRequestImpl() { ASSERT(!reset_callback_); } +AsyncStreamImpl::~AsyncStreamImpl() { ASSERT(!reset_callback_); } -void AsyncRequestImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { - response_.reset(new ResponseMessageImpl(std::move(headers))); +void AsyncStreamImpl::encodeHeaders(HeaderMapPtr&& headers, bool end_stream) { #ifndef NDEBUG log_debug("async http request response headers (end_stream={}):", end_stream); - response_->headers().iterate([](const HeaderEntry& header, void*) -> void { + headers->iterate([](const HeaderEntry& header, void*) -> void { log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str()); }, nullptr); #endif - if (end_stream) { - onComplete(); - } + stream_callbacks_.onHeaders(std::move(headers), end_stream); + closeRemote(end_stream); } -void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) { +void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) { log_trace("async http request response data (length={} end_stream={})", data.length(), end_stream); - if (!response_->body()) { - response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()}); - } - response_->body()->move(data); - - if (end_stream) { - onComplete(); - } + stream_callbacks_.onData(data, end_stream); + closeRemote(end_stream); } -void AsyncRequestImpl::encodeTrailers(HeaderMapPtr&& trailers) { - response_->trailers(std::move(trailers)); +void AsyncStreamImpl::encodeTrailers(HeaderMapPtr&& trailers) { #ifndef NDEBUG log_debug("async http request response trailers:"); - response_->trailers()->iterate([](const HeaderEntry& header, void*) -> void { + trailers->iterate([](const HeaderEntry& header, void*) -> void { log_debug(" '{}':'{}'", header.key().c_str(), header.value().c_str()); }, nullptr); #endif - onComplete(); + stream_callbacks_.onTrailers(std::move(trailers)); + closeRemote(true); } -void AsyncRequestImpl::cancel() { - reset_callback_(); - cleanup(); +void AsyncStreamImpl::sendHeaders(HeaderMap& headers, bool end_stream) { + headers.insertEnvoyInternalRequest().value(Headers::get().EnvoyInternalRequestValues.True); + headers.insertForwardedFor().value(parent_.config_.local_info_.address()); + router_.decodeHeaders(headers, end_stream); + closeLocal(end_stream); } -void AsyncRequestImpl::onComplete() { - complete_ = true; - callbacks_.onSuccess(std::move(response_)); - cleanup(); +void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { + router_.decodeData(data, end_stream); + closeLocal(end_stream); +} + +void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { + router_.decodeTrailers(trailers); + closeLocal(true); +} + +void AsyncStreamImpl::closeLocal(bool end_stream) { + ASSERT(!(local_closed_ && end_stream)); + + local_closed_ |= end_stream; + if (complete()) { + cleanup(); + } +} + +void AsyncStreamImpl::closeRemote(bool end_stream) { + remote_closed_ |= end_stream; + if (complete()) { + cleanup(); + } } -void AsyncRequestImpl::cleanup() { - response_.reset(); +void AsyncStreamImpl::reset() { + reset_callback_(); + resetStream(); +} + +void AsyncStreamImpl::cleanup() { reset_callback_ = nullptr; // This will destroy us, but only do so if we are actually in a list. This does not happen in // the immediate failure case. if (inserted()) { - removeFromList(parent_.active_requests_); + removeFromList(parent_.active_streams_); } } -void AsyncRequestImpl::resetStream() { - // In this case we don't have a valid response so we do need to raise a failure. - callbacks_.onFailure(AsyncClient::FailureReason::Reset); +void AsyncStreamImpl::resetStream() { + stream_callbacks_.onReset(); cleanup(); } -void AsyncRequestImpl::failDueToClientDestroy() { - // In this case we are going away because the client is being destroyed. We need to both reset - // the stream as well as raise a failure callback. - reset_callback_(); - callbacks_.onFailure(AsyncClient::FailureReason::Reset); - cleanup(); +AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, + AsyncClient::Callbacks& callbacks, + const Optional& timeout) + : AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) { + + sendHeaders(request_->headers(), !request_->body()); + if (!complete() && request_->body()) { + sendData(*request_->body(), true); + } + // TODO: Support request trailers. +} + +void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); } + +void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) { + response_.reset(new ResponseMessageImpl(std::move(headers))); + + if (end_stream) { + onComplete(); + } +} + +void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { + if (!response_->body()) { + response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()}); + } + response_->body()->move(data); + + if (end_stream) { + onComplete(); + } +} + +void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) { + response_->trailers(std::move(trailers)); + onComplete(); +} + +void AsyncRequestImpl::onReset() { + if (!cancelled_) { + // In this case we don't have a valid response so we do need to raise a failure. + callbacks_.onFailure(AsyncClient::FailureReason::Reset); + } +} + +void AsyncRequestImpl::cancel() { + cancelled_ = true; + reset(); } } // Http diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index a201502257d5..5793dc06f5fd 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -18,6 +18,7 @@ namespace Http { +class AsyncStreamImpl; class AsyncRequestImpl; class AsyncClientImpl final : public AsyncClient { @@ -32,12 +33,16 @@ class AsyncClientImpl final : public AsyncClient { Request* send(MessagePtr&& request, Callbacks& callbacks, const Optional& timeout) override; + Stream* start(StreamCallbacks& callbacks, + const Optional& timeout) override; + private: const Upstream::ClusterInfo& cluster_; Router::FilterConfig config_; Event::Dispatcher& dispatcher_; - std::list> active_requests_; + std::list> active_streams_; + friend class AsyncStreamImpl; friend class AsyncRequestImpl; }; @@ -45,17 +50,24 @@ class AsyncClientImpl final : public AsyncClient { * Implementation of AsyncRequest. This implementation is capable of sending HTTP requests to a * ConnectionPool asynchronously. */ -class AsyncRequestImpl final : public AsyncClient::Request, - StreamDecoderFilterCallbacks, - Logger::Loggable, - LinkedObject { +class AsyncStreamImpl : public AsyncClient::Stream, + StreamDecoderFilterCallbacks, + Logger::Loggable, + LinkedObject { public: - AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks, - const Optional& timeout); - ~AsyncRequestImpl(); - - // Http::AsyncHttpRequest - void cancel() override; + AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout); + virtual ~AsyncStreamImpl(); + + // Http::AsyncClient::Stream + void sendHeaders(HeaderMap& headers, bool end_stream) override; + void sendData(Buffer::Instance& data, bool end_stream) override; + void sendTrailers(HeaderMap& trailers) override; + void reset() override; + +protected: + bool complete() { return local_closed_ && remote_closed_; } + AsyncClientImpl& parent_; private: struct NullRateLimitPolicy : public Router::RateLimitPolicy { @@ -136,8 +148,9 @@ class AsyncRequestImpl final : public AsyncClient::Request, }; void cleanup(); - void failDueToClientDestroy(); - void onComplete(); + + void closeLocal(bool end_stream); + void closeRemote(bool end_stream); // Http::StreamDecoderFilterCallbacks void addResetStreamCallback(std::function callback) override { @@ -151,21 +164,51 @@ class AsyncRequestImpl final : public AsyncClient::Request, AccessLog::RequestInfo& requestInfo() override { return request_info_; } const std::string& downstreamAddress() override { return EMPTY_STRING; } void continueDecoding() override { NOT_IMPLEMENTED; } - const Buffer::Instance* decodingBuffer() override { return request_->body(); } + const Buffer::Instance* decodingBuffer() override { + throw EnvoyException("buffering is not supported in streaming"); + } void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override; void encodeData(Buffer::Instance& data, bool end_stream) override; void encodeTrailers(HeaderMapPtr&& trailers) override; - MessagePtr request_; - AsyncClientImpl& parent_; - AsyncClient::Callbacks& callbacks_; + AsyncClient::StreamCallbacks& stream_callbacks_; const uint64_t stream_id_; - std::unique_ptr response_; Router::ProdFilter router_; std::function reset_callback_; AccessLog::RequestInfoImpl request_info_; RouteImpl route_; - bool complete_{}; + bool local_closed_{}; + bool remote_closed_{}; + + friend class AsyncClientImpl; +}; + +class AsyncRequestImpl final : public AsyncClient::Request, + AsyncStreamImpl, + AsyncClient::StreamCallbacks { +public: + AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks, + const Optional& timeout); + + // AsyncClient::Request + virtual void cancel() override; + +private: + void onComplete(); + + // AsyncClient::StreamCallbacks + void onHeaders(HeaderMapPtr&& headers, bool end_stream) override; + void onData(Buffer::Instance& data, bool end_stream) override; + void onTrailers(HeaderMapPtr&& trailers) override; + void onReset() override; + + // Http::StreamDecoderFilterCallbacks + const Buffer::Instance* decodingBuffer() override { return request_->body(); } + + MessagePtr request_; + AsyncClient::Callbacks& callbacks_; + std::unique_ptr response_; + bool cancelled_{}; friend class AsyncClientImpl; }; diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 8729b0df887a..34c6f326a942 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -42,6 +42,7 @@ class AsyncClientImplTest : public testing::Test { MessagePtr message_{new RequestMessageImpl()}; MockAsyncClientCallbacks callbacks_; + MockAsyncClientStreamCallbacks stream_callbacks_; NiceMock cm_; NiceMock stream_encoder_; StreamDecoder* response_decoder_{}; @@ -54,6 +55,42 @@ class AsyncClientImplTest : public testing::Test { AsyncClientImpl client_; }; +TEST_F(AsyncClientImplTest, BasicStream) { + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + headers.addViaCopy("x-envoy-internal", "true"); + headers.addViaCopy("x-forwarded-for", "127.0.0.1"); + headers.addViaCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body.get()), true)); + + TestHeaderMapImpl expected_headers{{":status", "200"}, {"x-envoy-upstream-service-time", "0"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + stream->sendData(*body, true); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + response_decoder_->decodeData(*body, true); + + EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_200").value()); + EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("internal.upstream_rq_200").value()); +} + TEST_F(AsyncClientImplTest, Basic) { message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")}); Buffer::Instance& data = *message_->body(); @@ -131,6 +168,100 @@ TEST_F(AsyncClientImplTest, Retry) { response_decoder_->decodeHeaders(std::move(response_headers2), true); } +TEST_F(AsyncClientImplTest, NoRetryWithStream) { + ON_CALL(runtime_.snapshot_, featureEnabled("upstream.use_retry", 100)) + .WillByDefault(Return(true)); + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body.get()), true)); + + headers.insertEnvoyRetryOn().value(Headers::get().EnvoyRetryOnValues._5xx); + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + stream->sendData(*body, true); + + // Expect retry and retry timer create. + timer_ = new NiceMock(&dispatcher_); + HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "503"}}); + response_decoder_->decodeHeaders(std::move(response_headers), true); + + EXPECT_THROW(timer_->callback_(), EnvoyException); + EXPECT_CALL(stream_callbacks_, onReset()); +} + +TEST_F(AsyncClientImplTest, MultipleStreams) { + // Start stream 1 + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers(message_->headers()); + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body.get()), true)); + + TestHeaderMapImpl expected_headers{{":status", "200"}, {"x-envoy-upstream-service-time", "0"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + stream->sendData(*body, true); + + // Start stream 2 + Buffer::InstancePtr body2{new Buffer::OwnedImpl("test body")}; + NiceMock stream_encoder2; + StreamDecoder* response_decoder2{}; + MockAsyncClientStreamCallbacks stream_callbacks2; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder2, cm_.conn_pool_.host_); + response_decoder2 = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers2(message_->headers()); + EXPECT_CALL(stream_encoder2, encodeHeaders(HeaderMapEqualRef(&headers2), false)); + EXPECT_CALL(stream_encoder2, encodeData(BufferEqual(body2.get()), true)); + + TestHeaderMapImpl expected_headers2{{":status", "503"}, {"x-envoy-upstream-service-time", "0"}}; + EXPECT_CALL(stream_callbacks2, onHeaders_(HeaderMapEqualRef(&expected_headers2), true)); + + AsyncClient::Stream* stream2 = + client_.start(stream_callbacks2, Optional()); + stream2->sendHeaders(headers2, false); + stream2->sendData(*body2, true); + + // Finish stream 2. + HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "503"}}); + response_decoder2->decodeHeaders(std::move(response_headers2), true); + + // Finish stream 1. + HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + response_decoder_->decodeData(*body, true); +} + TEST_F(AsyncClientImplTest, MultipleRequests) { // Send request 1 message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")}); @@ -177,6 +308,99 @@ TEST_F(AsyncClientImplTest, MultipleRequests) { response_decoder_->decodeData(data, true); } +TEST_F(AsyncClientImplTest, StreamAndRequest) { + // Send request + message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")}); + Buffer::Instance& data = *message_->body(); + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true)); + + client_.send(std::move(message_), callbacks_, Optional()); + + // Start stream + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + NiceMock stream_encoder2; + StreamDecoder* response_decoder2{}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder2, cm_.conn_pool_.host_); + response_decoder2 = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + EXPECT_CALL(stream_encoder2, encodeHeaders(HeaderMapEqualRef(&headers), false)); + EXPECT_CALL(stream_encoder2, encodeData(BufferEqual(body.get()), true)); + + TestHeaderMapImpl expected_headers{{":status", "200"}, {"x-envoy-upstream-service-time", "0"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + stream->sendData(*body, true); + + // Finish stream. + HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "200"}}); + response_decoder2->decodeHeaders(std::move(response_headers2), false); + response_decoder2->decodeData(*body, true); + + // Finish request. + HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + expectSuccess(200); + response_decoder_->decodeData(data, true); +} + +TEST_F(AsyncClientImplTest, StreamWithTrailers) { + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + TestHeaderMapImpl trailers{{"some", "request_trailer"}}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body.get()), false)); + EXPECT_CALL(stream_encoder_, encodeTrailers(HeaderMapEqualRef(&trailers))); + + TestHeaderMapImpl expected_headers{{":status", "200"}, {"x-envoy-upstream-service-time", "0"}}; + TestHeaderMapImpl expected_trailers{{"some", "trailer"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false)); + EXPECT_CALL(stream_callbacks_, onTrailers_(HeaderMapEqualRef(&expected_trailers))); + + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + stream->sendData(*body, false); + stream->sendTrailers(trailers); + + HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + response_decoder_->decodeData(*body, false); + response_decoder_->decodeTrailers(HeaderMapPtr{new TestHeaderMapImpl{{"some", "trailer"}}}); +} + TEST_F(AsyncClientImplTest, Trailers) { message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("test body")}); Buffer::Instance& data = *message_->body(); @@ -217,6 +441,78 @@ TEST_F(AsyncClientImplTest, ImmediateReset) { EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_503").value()); } +TEST_F(AsyncClientImplTest, LocalResetAfterStreamStart) { + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + headers.addViaCopy("x-envoy-internal", "true"); + headers.addViaCopy("x-forwarded-for", "127.0.0.1"); + headers.addViaCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body.get()), false)); + + TestHeaderMapImpl expected_headers{{":status", "200"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false)); + EXPECT_CALL(stream_callbacks_, onReset()); + + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + stream->sendData(*body, false); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + response_decoder_->decodeData(*body, false); + + stream->reset(); +} + +TEST_F(AsyncClientImplTest, RemoteResetAfterStreamStart) { + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + headers.addViaCopy("x-envoy-internal", "true"); + headers.addViaCopy("x-forwarded-for", "127.0.0.1"); + headers.addViaCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body.get()), false)); + + TestHeaderMapImpl expected_headers{{":status", "200"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false)); + EXPECT_CALL(stream_callbacks_, onReset()); + + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + stream->sendData(*body, false); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + response_decoder_->decodeData(*body, false); + + stream_encoder_.getStream().resetStream(StreamResetReason::RemoteReset); +} + TEST_F(AsyncClientImplTest, ResetAfterResponseStart) { EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) @@ -235,6 +531,24 @@ TEST_F(AsyncClientImplTest, ResetAfterResponseStart) { stream_encoder_.getStream().resetStream(StreamResetReason::RemoteReset); } +TEST_F(AsyncClientImplTest, ResetStream) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + return nullptr; + })); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true)); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + EXPECT_CALL(stream_callbacks_, onReset()); + + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(message_->headers(), true); + stream->reset(); +} + TEST_F(AsyncClientImplTest, CancelRequest) { EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks) @@ -251,7 +565,23 @@ TEST_F(AsyncClientImplTest, CancelRequest) { request->cancel(); } -TEST_F(AsyncClientImplTest, DestroyWithActive) { +TEST_F(AsyncClientImplTest, DestroyWithActiveStream) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + return nullptr; + })); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), false)); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + EXPECT_CALL(stream_callbacks_, onReset()); + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(message_->headers(), false); +} + +TEST_F(AsyncClientImplTest, DestroyWithActiveRequest) { EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { @@ -280,6 +610,33 @@ TEST_F(AsyncClientImplTest, PoolFailure) { EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_503").value()); } +TEST_F(AsyncClientImplTest, StreamTimeout) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + return nullptr; + })); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true)); + timer_ = new NiceMock(&dispatcher_); + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40))); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + + TestHeaderMapImpl expected_timeout{ + {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false)); + EXPECT_CALL(stream_callbacks_, onData(_, true)); + + AsyncClient::Stream* stream = client_.start(stream_callbacks_, std::chrono::milliseconds(40)); + stream->sendHeaders(message_->headers(), true); + timer_->callback_(); + + EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_timeout").value()); + EXPECT_EQ(1UL, cm_.conn_pool_.host_->stats().rq_timeout_.value()); + EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_504").value()); +} + TEST_F(AsyncClientImplTest, RequestTimeout) { EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks) @@ -319,4 +676,67 @@ TEST_F(AsyncClientImplTest, DisableTimer) { request->cancel(); } +TEST_F(AsyncClientImplTest, DisableTimerWithStream) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + return nullptr; + })); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true)); + timer_ = new NiceMock(&dispatcher_); + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40))); + EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(stream_encoder_.stream_, resetStream(_)); + EXPECT_CALL(stream_callbacks_, onReset()); + + AsyncClient::Stream* stream = client_.start(stream_callbacks_, std::chrono::milliseconds(40)); + stream->sendHeaders(message_->headers(), true); + stream->reset(); +} + +TEST_F(AsyncClientImplTest, MultipleDataStream) { + Buffer::InstancePtr body{new Buffer::OwnedImpl("test body")}; + Buffer::InstancePtr body2{new Buffer::OwnedImpl("test body2")}; + + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) + -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_); + response_decoder_ = &decoder; + return nullptr; + })); + + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + headers.addViaCopy("x-envoy-internal", "true"); + headers.addViaCopy("x-forwarded-for", "127.0.0.1"); + headers.addViaCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers), false)); + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body.get()), false)); + + TestHeaderMapImpl expected_headers{{":status", "200"}}; + EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_headers), false)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false)); + + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + stream->sendData(*body, false); + + response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), false); + response_decoder_->decodeData(*body, false); + + EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body2.get()), true)); + EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body2.get()), true)); + + stream->sendData(*body2, true); + response_decoder_->decodeData(*body2, true); + + EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_200").value()); + EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("internal.upstream_rq_200").value()); +} + } // Http diff --git a/test/mocks/http/mocks.cc b/test/mocks/http/mocks.cc index 27bf8161cb5f..c073eb5e8bd4 100644 --- a/test/mocks/http/mocks.cc +++ b/test/mocks/http/mocks.cc @@ -112,6 +112,9 @@ MockAsyncClient::~MockAsyncClient() {} MockAsyncClientCallbacks::MockAsyncClientCallbacks() {} MockAsyncClientCallbacks::~MockAsyncClientCallbacks() {} +MockAsyncClientStreamCallbacks::MockAsyncClientStreamCallbacks() {} +MockAsyncClientStreamCallbacks::~MockAsyncClientStreamCallbacks() {} + MockAsyncClientRequest::MockAsyncClientRequest(MockAsyncClient* client) : client_(client) {} MockAsyncClientRequest::~MockAsyncClientRequest() { client_->onRequestDestroy(); } diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 8a924b5b840e..ab37178479bc 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -298,6 +298,9 @@ class MockAsyncClient : public AsyncClient { MOCK_METHOD3(send_, Request*(MessagePtr& request, Callbacks& callbacks, const Optional& timeout)); + + MOCK_METHOD2(start, Stream*(StreamCallbacks& callbacks, + const Optional& timeout)); }; class MockAsyncClientCallbacks : public AsyncClient::Callbacks { @@ -312,6 +315,22 @@ class MockAsyncClientCallbacks : public AsyncClient::Callbacks { MOCK_METHOD1(onFailure, void(Http::AsyncClient::FailureReason reason)); }; +class MockAsyncClientStreamCallbacks : public AsyncClient::StreamCallbacks { +public: + MockAsyncClientStreamCallbacks(); + ~MockAsyncClientStreamCallbacks(); + + void onHeaders(HeaderMapPtr&& headers, bool end_stream) override { + onHeaders_(*headers, end_stream); + } + void onTrailers(HeaderMapPtr&& trailers) override { onTrailers_(*trailers); } + + MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream)); + MOCK_METHOD2(onData, void(Buffer::Instance& data, bool end_stream)); + MOCK_METHOD1(onTrailers_, void(HeaderMap& headers)); + MOCK_METHOD0(onReset, void()); +}; + class MockAsyncClientRequest : public AsyncClient::Request { public: MockAsyncClientRequest(MockAsyncClient* client);