From b16f5299e45ca71a36ec6f7fc006ed311b58a843 Mon Sep 17 00:00:00 2001 From: Andres Guedez <34292400+AndresGuedez@users.noreply.github.com> Date: Thu, 4 Oct 2018 11:45:18 -0400 Subject: [PATCH] network: reintroduce #4382 (delayed conn close) with segv fix (#4587) Re-enable the changes reverted in 9d32e5c2a14cd9ab96b6e77fb04f7bd77b2c0d71, which were originally merged as part of #4382. Signed-off-by: Andres Guedez --- .../v2/http_connection_manager.proto | 1 - docs/root/intro/version_history.rst | 1 + include/envoy/network/connection.h | 17 +- source/common/http/conn_manager_config.h | 7 + source/common/http/conn_manager_impl.cc | 16 +- source/common/http/http1/conn_pool.cc | 2 +- source/common/http/http2/conn_pool.cc | 2 +- source/common/network/connection_impl.cc | 68 +++- source/common/network/connection_impl.h | 13 +- source/common/tcp/conn_pool.cc | 2 +- source/common/tcp_proxy/tcp_proxy.cc | 2 +- .../network/http_connection_manager/config.cc | 3 +- .../network/http_connection_manager/config.h | 2 + .../network/redis_proxy/proxy_filter.cc | 2 +- .../stat_sinks/common/statsd/statsd.cc | 2 +- source/server/http/admin.h | 1 + .../http/conn_manager_impl_fuzz_test.cc | 2 + test/common/http/conn_manager_impl_test.cc | 21 +- test/common/http/conn_manager_utility_test.cc | 1 + test/common/network/connection_impl_test.cc | 296 +++++++++++++++++- test/integration/http2_integration_test.cc | 59 ++++ test/integration/http_integration.cc | 26 +- test/integration/http_integration.h | 4 +- test/integration/integration_test.cc | 108 +++++++ test/integration/utility.h | 9 +- test/mocks/network/mocks.h | 4 + 26 files changed, 639 insertions(+), 32 deletions(-) diff --git a/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto b/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto index 1a7042c6a2e7..4c8c93acce67 100644 --- a/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto +++ b/api/envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto @@ -175,7 +175,6 @@ message HttpConnectionManager { // option is not specified. google.protobuf.Duration drain_timeout = 12 [(gogoproto.stdduration) = true]; - // [#not-implemented-hide:] // The delayed close timeout is for downstream connections managed by the HTTP connection manager. // It is defined as a grace period after connection close processing has been locally initiated // during which Envoy will flush the write buffers for the connection and await the peer to close diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 952fac8dd51f..048dbf15125a 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -60,6 +60,7 @@ Version history dynamic table size of both: encoder and decoder. * http: added support for removing request headers using :ref:`request_headers_to_remove `. +* http: added support for a :ref:`delayed close timeout` to mitigate race conditions when closing connections to downstream HTTP clients. The timeout defaults to 1 second. * jwt-authn filter: add support for per route JWT requirements. * listeners: added the ability to match :ref:`FilterChain ` using :ref:`destination_port ` and diff --git a/include/envoy/network/connection.h b/include/envoy/network/connection.h index ca5e390b4e11..899dc5fe7c4f 100644 --- a/include/envoy/network/connection.h +++ b/include/envoy/network/connection.h @@ -64,7 +64,9 @@ class ConnectionCallbacks { */ enum class ConnectionCloseType { FlushWrite, // Flush pending write data before raising ConnectionEvent::LocalClose - NoFlush // Do not flush any pending data and immediately raise ConnectionEvent::LocalClose + NoFlush, // Do not flush any pending data and immediately raise ConnectionEvent::LocalClose + FlushWriteAndDelay // Flush pending write data and delay raising a ConnectionEvent::LocalClose + // until the delayed_close_timeout expires }; /** @@ -87,6 +89,8 @@ class Connection : public Event::DeferredDeletable, public FilterManager { Stats::Gauge& write_current_; // Counter* as this is an optional counter. Bind errors will not be tracked if this is nullptr. Stats::Counter* bind_errors_; + // Optional counter. Delayed close timeouts will not be tracked if this is nullptr. + Stats::Counter* delayed_close_timeouts_; }; virtual ~Connection() {} @@ -243,6 +247,17 @@ class Connection : public Event::DeferredDeletable, public FilterManager { */ virtual RequestInfo::FilterState& perConnectionState() PURE; virtual const RequestInfo::FilterState& perConnectionState() const PURE; + + /** + * Set the timeout for delayed connection close()s. + * @param timeout The timeout value in milliseconds + */ + virtual void setDelayedCloseTimeout(std::chrono::milliseconds timeout) PURE; + + /** + * @return std::chrono::milliseconds The delayed close timeout value. + */ + virtual std::chrono::milliseconds delayedCloseTimeout() const PURE; }; typedef std::unique_ptr ConnectionPtr; diff --git a/source/common/http/conn_manager_config.h b/source/common/http/conn_manager_config.h index 0cf7915c0364..e10875ecd807 100644 --- a/source/common/http/conn_manager_config.h +++ b/source/common/http/conn_manager_config.h @@ -39,6 +39,7 @@ namespace Http { COUNTER (downstream_cx_drain_close) \ COUNTER (downstream_cx_idle_timeout) \ COUNTER (downstream_cx_overload_disable_keepalive) \ + COUNTER (downstream_cx_delayed_close_timeout) \ COUNTER (downstream_flow_control_paused_reading_total) \ COUNTER (downstream_flow_control_resumed_reading_total) \ COUNTER (downstream_rq_total) \ @@ -225,6 +226,12 @@ class ConnectionManagerConfig { */ virtual std::chrono::milliseconds streamIdleTimeout() const PURE; + /** + * @return delayed close timeout for downstream HTTP connections. Zero indicates a disabled + * timeout. See http_connection_manager.proto for a detailed description of this timeout. + */ + virtual std::chrono::milliseconds delayedCloseTimeout() const PURE; + /** * @return Router::RouteConfigProvider& the configuration provider used to acquire a route * config for each request flow. diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index ac4da676001c..bd23feb88ba5 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -98,10 +98,12 @@ void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCal idle_timer_->enableTimer(config_.idleTimeout().value()); } + read_callbacks_->connection().setDelayedCloseTimeout(config_.delayedCloseTimeout()); + read_callbacks_->connection().setConnectionStats( {stats_.named_.downstream_cx_rx_bytes_total_, stats_.named_.downstream_cx_rx_bytes_buffered_, stats_.named_.downstream_cx_tx_bytes_total_, stats_.named_.downstream_cx_tx_bytes_buffered_, - nullptr}); + nullptr, &stats_.named_.downstream_cx_delayed_close_timeout_}); } ConnectionManagerImpl::~ConnectionManagerImpl() { @@ -129,7 +131,7 @@ ConnectionManagerImpl::~ConnectionManagerImpl() { void ConnectionManagerImpl::checkForDeferredClose() { if (drain_state_ == DrainState::Closing && streams_.empty() && !codec_->wantsToWrite()) { - read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWriteAndDelay); } } @@ -244,12 +246,12 @@ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool ENVOY_CONN_LOG(debug, "dispatch error: {}", read_callbacks_->connection(), e.what()); stats_.named_.downstream_cx_protocol_error_.inc(); - // In the protocol error case, we need to reset all streams now. Since we do a flush write, - // the connection might stick around long enough for a pending stream to come back and try - // to encode. + // In the protocol error case, we need to reset all streams now. Since we do a flush write and + // delayed close, the connection might stick around long enough for a pending stream to come + // back and try to encode. resetAllStreams(); - read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWriteAndDelay); return Network::FilterStatus::StopIteration; } @@ -328,6 +330,8 @@ void ConnectionManagerImpl::onIdleTimeout() { ENVOY_CONN_LOG(debug, "idle timeout", read_callbacks_->connection()); stats_.named_.downstream_cx_idle_timeout_.inc(); if (!codec_) { + // No need to delay close after flushing since an idle timeout has already fired. Attempt to + // write out buffered data one last time and issue a local close if successful. read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); } else if (drain_state_ == DrainState::NotDraining) { startDrainSequence(); diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 607063e0e24b..2858a3576ebd 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -335,7 +335,7 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_, - &parent_.host_->cluster().stats().bind_errors_}); + &parent_.host_->cluster().stats().bind_errors_, nullptr}); } ConnPoolImpl::ActiveClient::~ActiveClient() { diff --git a/source/common/http/http2/conn_pool.cc b/source/common/http/http2/conn_pool.cc index 033cb1e2bb6b..baa5b0f6583c 100644 --- a/source/common/http/http2/conn_pool.cc +++ b/source/common/http/http2/conn_pool.cc @@ -242,7 +242,7 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_, - &parent_.host_->cluster().stats().bind_errors_}); + &parent_.host_->cluster().stats().bind_errors_, nullptr}); } ConnPoolImpl::ActiveClient::~ActiveClient() { diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 47f44daa4fd2..28b751366c37 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -68,7 +68,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt } ConnectionImpl::~ConnectionImpl() { - ASSERT(fd() == -1, "ConnectionImpl was unexpectedly torn down without being closed."); + ASSERT(fd() == -1 && delayed_close_timer_ == nullptr, + "ConnectionImpl was unexpectedly torn down without being closed."); // In general we assume that owning code has called close() previously to the destructor being // run. This generally must be done so that callbacks run in the correct context (vs. deferred @@ -106,10 +107,51 @@ void ConnectionImpl::close(ConnectionCloseType type) { closeSocket(ConnectionEvent::LocalClose); } else { - // TODO(mattklein123): We need a flush timer here. We might never get open socket window. - ASSERT(type == ConnectionCloseType::FlushWrite); - close_with_flush_ = true; + ASSERT(type == ConnectionCloseType::FlushWrite || + type == ConnectionCloseType::FlushWriteAndDelay); + + // No need to continue if a FlushWrite/FlushWriteAndDelay has already been issued and there is a + // pending delayed close. + // + // An example of this condition manifests when a downstream connection is closed early by Envoy, + // such as when a route can't be matched: + // In ConnectionManagerImpl::onData() + // 1) Via codec_->dispatch(), a local reply with a 404 is sent to the client + // a) ConnectionManagerImpl::doEndStream() issues the first connection close() via + // ConnectionManagerImpl::checkForDeferredClose() + // 2) A second close is issued by a subsequent call to + // ConnectionManagerImpl::checkForDeferredClose() prior to returning from onData() + if (delayed_close_) { + return; + } + + delayed_close_ = true; + const bool delayed_close_timeout_set = delayedCloseTimeout().count() > 0; + + // NOTE: the delayed close timeout (if set) affects both FlushWrite and FlushWriteAndDelay + // closes: + // 1. For FlushWrite, the timeout sets an upper bound on how long to wait for the flush to + // complete before the connection is locally closed. + // 2. For FlushWriteAndDelay, the timeout specifies an upper bound on how long to wait for the + // flush to complete and the peer to close the connection before it is locally closed. + + // All close types that follow do not actually close() the socket immediately so that buffered + // data can be written. However, we do want to stop reading to apply TCP backpressure. read_enabled_ = false; + + // Force a closeSocket() after the write buffer is flushed if the close_type calls for it or if + // no delayed close timeout is set. + close_after_flush_ = !delayed_close_timeout_set || type == ConnectionCloseType::FlushWrite; + + // Create and activate a timer which will immediately close the connection if triggered. + // A config value of 0 disables the timeout. + if (delayed_close_timeout_set) { + delayed_close_timer_ = dispatcher_.createTimer([this]() -> void { onDelayedCloseTimeout(); }); + ENVOY_CONN_LOG(debug, "setting delayed close timer with timeout {} ms", *this, + delayedCloseTimeout().count()); + delayed_close_timer_->enableTimer(delayedCloseTimeout()); + } + file_event_->setEnabled(Event::FileReadyType::Write | (enable_half_close_ ? 0 : Event::FileReadyType::Closed)); } @@ -118,7 +160,7 @@ void ConnectionImpl::close(ConnectionCloseType type) { Connection::State ConnectionImpl::state() const { if (fd() == -1) { return State::Closed; - } else if (close_with_flush_) { + } else if (delayed_close_) { return State::Closing; } else { return State::Open; @@ -130,6 +172,12 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) { return; } + // No need for a delayed close (if pending) now that the socket is being closed. + if (delayed_close_timer_) { + delayed_close_timer_->disableTimer(); + delayed_close_timer_ = nullptr; + } + ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast(close_type)); transport_socket_->closeSocket(close_type); @@ -488,7 +536,7 @@ void ConnectionImpl::onWriteReady() { // write callback. This can happen if we manage to complete the SSL handshake in the write // callback, raise a connected event, and close the connection. closeSocket(ConnectionEvent::RemoteClose); - } else if ((close_with_flush_ && new_buffer_size == 0) || bothSidesHalfClosed()) { + } else if ((close_after_flush_ && new_buffer_size == 0) || bothSidesHalfClosed()) { ENVOY_CONN_LOG(debug, "write flush complete", *this); closeSocket(ConnectionEvent::LocalClose); } else if (result.action_ == PostIoAction::KeepOpen && result.bytes_processed_ > 0) { @@ -535,6 +583,14 @@ bool ConnectionImpl::bothSidesHalfClosed() { return read_end_stream_ && write_end_stream_ && write_buffer_->length() == 0; } +void ConnectionImpl::onDelayedCloseTimeout() { + ENVOY_CONN_LOG(debug, "triggered delayed close", *this); + if (connection_stats_ != nullptr && connection_stats_->delayed_close_timeouts_ != nullptr) { + connection_stats_->delayed_close_timeouts_->inc(); + } + closeSocket(ConnectionEvent::LocalClose); +} + ClientConnectionImpl::ClientConnectionImpl( Event::Dispatcher& dispatcher, const Address::InstanceConstSharedPtr& remote_address, const Network::Address::InstanceConstSharedPtr& source_address, diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 69c9e97620c8..b71fd0127e7c 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -121,6 +121,11 @@ class ConnectionImpl : public virtual Connection, // Obtain global next connection ID. This should only be used in tests. static uint64_t nextGlobalIdForTest() { return next_global_id_; } + void setDelayedCloseTimeout(std::chrono::milliseconds timeout) override { + delayed_close_timeout_ = timeout; + } + std::chrono::milliseconds delayedCloseTimeout() const override { return delayed_close_timeout_; } + protected: void closeSocket(ConnectionEvent close_type); @@ -137,6 +142,7 @@ class ConnectionImpl : public virtual Connection, // a generic pointer. Buffer::InstancePtr write_buffer_; uint32_t read_buffer_limit_ = 0; + std::chrono::milliseconds delayed_close_timeout_{0}; protected: bool connecting_{false}; @@ -157,14 +163,19 @@ class ConnectionImpl : public virtual Connection, // Returns true iff end of stream has been both written and read. bool bothSidesHalfClosed(); + // Callback issued when a delayed close timeout triggers. + void onDelayedCloseTimeout(); + static std::atomic next_global_id_; Event::Dispatcher& dispatcher_; const uint64_t id_; + Event::TimerPtr delayed_close_timer_; std::list callbacks_; std::list bytes_sent_callbacks_; bool read_enabled_{true}; - bool close_with_flush_{false}; + bool close_after_flush_{false}; + bool delayed_close_{false}; bool above_high_watermark_{false}; bool detect_early_close_{true}; bool enable_half_close_{false}; diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index 21a7f6581ece..bd887b17f424 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -338,7 +338,7 @@ ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent) parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_, - &parent_.host_->cluster().stats().bind_errors_}); + &parent_.host_->cluster().stats().bind_errors_, nullptr}); // We just universally set no delay on connections. Theoretically we might at some point want // to make this configurable. diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 5463cc348785..c44e35c85c94 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -206,7 +206,7 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec {config_->stats().downstream_cx_rx_bytes_total_, config_->stats().downstream_cx_rx_bytes_buffered_, config_->stats().downstream_cx_tx_bytes_total_, - config_->stats().downstream_cx_tx_bytes_buffered_, nullptr}); + config_->stats().downstream_cx_tx_bytes_buffered_, nullptr, nullptr}); } } diff --git a/source/extensions/filters/network/http_connection_manager/config.cc b/source/extensions/filters/network/http_connection_manager/config.cc index aaeeb99ba38a..c80e3983082e 100644 --- a/source/extensions/filters/network/http_connection_manager/config.cc +++ b/source/extensions/filters/network/http_connection_manager/config.cc @@ -153,7 +153,8 @@ HttpConnectionManagerConfig::HttpConnectionManagerConfig( date_provider_(date_provider), listener_stats_(Http::ConnectionManagerImpl::generateListenerStats(stats_prefix_, context_.listenerScope())), - proxy_100_continue_(config.proxy_100_continue()) { + proxy_100_continue_(config.proxy_100_continue()), + delayed_close_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, delayed_close_timeout, 1000)) { route_config_provider_ = Router::RouteConfigProviderUtil::create(config, context_, stats_prefix_, route_config_provider_manager_); diff --git a/source/extensions/filters/network/http_connection_manager/config.h b/source/extensions/filters/network/http_connection_manager/config.h index 8e252a8acd6e..a27d609d4d04 100644 --- a/source/extensions/filters/network/http_connection_manager/config.h +++ b/source/extensions/filters/network/http_connection_manager/config.h @@ -133,6 +133,7 @@ class HttpConnectionManagerConfig : Logger::Loggable, Http::ConnectionManagerListenerStats& listenerStats() override { return listener_stats_; } bool proxy100Continue() const override { return proxy_100_continue_; } const Http::Http1Settings& http1Settings() const override { return http1_settings_; } + std::chrono::milliseconds delayedCloseTimeout() const override { return delayed_close_timeout_; } private: typedef std::list FilterFactoriesList; @@ -170,6 +171,7 @@ class HttpConnectionManagerConfig : Logger::Loggable, Http::DateProvider& date_provider_; Http::ConnectionManagerListenerStats listener_stats_; const bool proxy_100_continue_; + std::chrono::milliseconds delayed_close_timeout_; // Default idle timeout is 5 minutes if nothing is specified in the HCM config. static const uint64_t StreamIdleTimeoutMs = 5 * 60 * 1000; diff --git a/source/extensions/filters/network/redis_proxy/proxy_filter.cc b/source/extensions/filters/network/redis_proxy/proxy_filter.cc index e64fb795f3d4..00b07511922b 100644 --- a/source/extensions/filters/network/redis_proxy/proxy_filter.cc +++ b/source/extensions/filters/network/redis_proxy/proxy_filter.cc @@ -49,7 +49,7 @@ void ProxyFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& ca config_->stats_.downstream_cx_rx_bytes_buffered_, config_->stats_.downstream_cx_tx_bytes_total_, config_->stats_.downstream_cx_tx_bytes_buffered_, - nullptr}); + nullptr, nullptr}); } void ProxyFilter::onRespValue(RespValuePtr&& value) { diff --git a/source/extensions/stat_sinks/common/statsd/statsd.cc b/source/extensions/stat_sinks/common/statsd/statsd.cc index 6882d10c6803..ba848e4aac26 100644 --- a/source/extensions/stat_sinks/common/statsd/statsd.cc +++ b/source/extensions/stat_sinks/common/statsd/statsd.cc @@ -244,7 +244,7 @@ void TcpStatsdSink::TlsSink::write(Buffer::Instance& buffer) { parent_.cluster_info_->stats().upstream_cx_rx_bytes_buffered_, parent_.cluster_info_->stats().upstream_cx_tx_bytes_total_, parent_.cluster_info_->stats().upstream_cx_tx_bytes_buffered_, - &parent_.cluster_info_->stats().bind_errors_}); + &parent_.cluster_info_->stats().bind_errors_, nullptr}); connection_->connect(); } diff --git a/source/server/http/admin.h b/source/server/http/admin.h index 591c7611134b..c2df3acdcc42 100644 --- a/source/server/http/admin.h +++ b/source/server/http/admin.h @@ -100,6 +100,7 @@ class AdminImpl : public Admin, bool generateRequestId() override { return false; } absl::optional idleTimeout() const override { return idle_timeout_; } std::chrono::milliseconds streamIdleTimeout() const override { return {}; } + std::chrono::milliseconds delayedCloseTimeout() const override { return {}; } Router::RouteConfigProvider& routeConfigProvider() override { return route_config_provider_; } const std::string& serverName() override { return Http::DefaultServerString::get(); } Http::ConnectionManagerStats& stats() override { return stats_; } diff --git a/test/common/http/conn_manager_impl_fuzz_test.cc b/test/common/http/conn_manager_impl_fuzz_test.cc index 2e6f2bcea62c..7fb359966cdf 100644 --- a/test/common/http/conn_manager_impl_fuzz_test.cc +++ b/test/common/http/conn_manager_impl_fuzz_test.cc @@ -91,6 +91,7 @@ class FuzzConfig : public ConnectionManagerConfig { bool generateRequestId() override { return true; } absl::optional idleTimeout() const override { return idle_timeout_; } std::chrono::milliseconds streamIdleTimeout() const override { return stream_idle_timeout_; } + std::chrono::milliseconds delayedCloseTimeout() const override { return delayed_close_timeout_; } Router::RouteConfigProvider& routeConfigProvider() override { return route_config_provider_; } const std::string& serverName() override { return server_name_; } ConnectionManagerStats& stats() override { return stats_; } @@ -129,6 +130,7 @@ class FuzzConfig : public ConnectionManagerConfig { ConnectionManagerTracingStats tracing_stats_; ConnectionManagerListenerStats listener_stats_; std::chrono::milliseconds stream_idle_timeout_{}; + std::chrono::milliseconds delayed_close_timeout_{}; bool use_remote_address_{true}; Http::ForwardClientCertType forward_client_cert_{Http::ForwardClientCertType::Sanitize}; std::vector set_current_client_cert_details_; diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index 0f1d02978fa7..017a7bb1f3a3 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -259,6 +259,7 @@ class HttpConnectionManagerImplTest : public Test, public ConnectionManagerConfi bool generateRequestId() override { return true; } absl::optional idleTimeout() const override { return idle_timeout_; } std::chrono::milliseconds streamIdleTimeout() const override { return stream_idle_timeout_; } + std::chrono::milliseconds delayedCloseTimeout() const override { return delayed_close_timeout_; } Router::RouteConfigProvider& routeConfigProvider() override { return route_config_provider_; } const std::string& serverName() override { return server_name_; } ConnectionManagerStats& stats() override { return stats_; } @@ -305,6 +306,7 @@ class HttpConnectionManagerImplTest : public Test, public ConnectionManagerConfi absl::optional user_agent_; absl::optional idle_timeout_; std::chrono::milliseconds stream_idle_timeout_{}; + std::chrono::milliseconds delayed_close_timeout_{}; NiceMock random_; NiceMock local_info_; NiceMock factory_context_; @@ -1940,7 +1942,8 @@ TEST_F(HttpConnectionManagerImplTest, DrainClose) { EXPECT_EQ(ssl_connection_.get(), filter->callbacks_->connection()->ssl()); EXPECT_CALL(*codec_, goAway()); - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWriteAndDelay)); EXPECT_CALL(*drain_timer, disableTimer()); drain_timer->callback_(); @@ -1976,7 +1979,8 @@ TEST_F(HttpConnectionManagerImplTest, ResponseBeforeRequestComplete) { EXPECT_STREQ("envoy-server-test", headers.Server()->value().c_str()); })); EXPECT_CALL(*decoder_filters_[0], onDestroy()); - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWriteAndDelay)); HeaderMapPtr response_headers{new TestHeaderMapImpl{{":status", "200"}}}; decoder_filters_[0]->callbacks_->encodeHeaders(std::move(response_headers), true); @@ -2029,7 +2033,8 @@ TEST_F(HttpConnectionManagerImplTest, ResponseStartBeforeRequestComplete) { // Since we started the response before the request was complete, we will still close the // connection since we already sent a connection: close header. We won't "reset" the stream // however. - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWriteAndDelay)); Buffer::OwnedImpl fake_response("world"); filter->callbacks_->encodeData(fake_response, true); } @@ -2065,8 +2070,11 @@ TEST_F(HttpConnectionManagerImplTest, DownstreamProtocolError) { EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0); - // A protocol exception should result in reset of the streams followed by a local close. - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); + // A protocol exception should result in reset of the streams followed by a remote or local close + // depending on whether the downstream client closes the connection prior to the delayed close + // timer firing. + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWriteAndDelay)); // Kick off the incoming data. Buffer::OwnedImpl fake_input("1234"); @@ -2132,7 +2140,8 @@ TEST_F(HttpConnectionManagerImplTest, IdleTimeout) { idle_timer->callback_(); EXPECT_CALL(*codec_, goAway()); - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWriteAndDelay)); EXPECT_CALL(*idle_timer, disableTimer()); EXPECT_CALL(*drain_timer, disableTimer()); drain_timer->callback_(); diff --git a/test/common/http/conn_manager_utility_test.cc b/test/common/http/conn_manager_utility_test.cc index b49fe5d3f9f4..65a9325c5207 100644 --- a/test/common/http/conn_manager_utility_test.cc +++ b/test/common/http/conn_manager_utility_test.cc @@ -51,6 +51,7 @@ class MockConnectionManagerConfig : public ConnectionManagerConfig { MOCK_METHOD0(generateRequestId, bool()); MOCK_CONST_METHOD0(idleTimeout, absl::optional()); MOCK_CONST_METHOD0(streamIdleTimeout, std::chrono::milliseconds()); + MOCK_CONST_METHOD0(delayedCloseTimeout, std::chrono::milliseconds()); MOCK_METHOD0(routeConfigProvider, Router::RouteConfigProvider&()); MOCK_METHOD0(serverName, const std::string&()); MOCK_METHOD0(stats, ConnectionManagerStats&()); diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index db7a63c67624..f85e2ea92bb7 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -169,6 +169,35 @@ class ConnectionImplTest : public testing::TestWithParam { } protected: + struct ConnectionMocks { + std::unique_ptr> dispatcher; + Event::MockTimer* timer; + std::unique_ptr> transport_socket; + }; + + ConnectionMocks createConnectionMocks() { + auto dispatcher = std::make_unique>(); + EXPECT_CALL(dispatcher->buffer_factory_, create_(_, _)) + .WillRepeatedly(Invoke([](std::function below_low, + std::function above_high) -> Buffer::Instance* { + // ConnectionImpl calls Envoy::MockBufferFactory::create(), which calls create_() and + // wraps the returned raw pointer below with a unique_ptr. + return new Buffer::WatermarkBuffer(below_low, above_high); + })); + + // This timer will be returned (transferring ownership) to the ConnectionImpl when createTimer() + // is called to allocate the delayed close timer. + auto timer = new Event::MockTimer(dispatcher.get()); + + auto file_event = std::make_unique>(); + EXPECT_CALL(*dispatcher, createFileEvent_(0, _, _, _)).WillOnce(Return(file_event.release())); + + auto transport_socket = std::make_unique>(); + EXPECT_CALL(*transport_socket, canFlushClose()).WillOnce(Return(true)); + + return ConnectionMocks{std::move(dispatcher), timer, std::move(transport_socket)}; + } + Event::SimulatedTimeSystem time_system_; Event::DispatcherPtr dispatcher_; Stats::IsolatedStoreImpl stats_store_; @@ -358,7 +387,8 @@ TEST_P(ConnectionImplTest, SocketOptionsFailureTest) { struct MockConnectionStats { Connection::ConnectionStats toBufferStats() { - return {rx_total_, rx_current_, tx_total_, tx_current_, &bind_errors_}; + return {rx_total_, rx_current_, tx_total_, + tx_current_, &bind_errors_, &delayed_close_timeouts_}; } StrictMock rx_total_; @@ -366,6 +396,21 @@ struct MockConnectionStats { StrictMock tx_total_; StrictMock tx_current_; StrictMock bind_errors_; + StrictMock delayed_close_timeouts_; +}; + +struct NiceMockConnectionStats { + Connection::ConnectionStats toBufferStats() { + return {rx_total_, rx_current_, tx_total_, + tx_current_, &bind_errors_, &delayed_close_timeouts_}; + } + + NiceMock rx_total_; + NiceMock rx_current_; + NiceMock tx_total_; + NiceMock tx_current_; + NiceMock bind_errors_; + NiceMock delayed_close_timeouts_; }; TEST_P(ConnectionImplTest, ConnectionStats) { @@ -870,6 +915,255 @@ TEST_P(ConnectionImplTest, EmptyReadOnCloseTest) { disconnect(true); } +// Test that a FlushWrite close immediately triggers a close after the write buffer is flushed. +TEST_P(ConnectionImplTest, FlushWriteCloseTest) { + setUpBasicConnection(); + connect(); + + InSequence s1; + + time_system_.setMonotonicTime(std::chrono::milliseconds(0)); + server_connection_->setDelayedCloseTimeout(std::chrono::milliseconds(100)); + + std::shared_ptr client_read_filter(new NiceMock()); + client_connection_->addReadFilter(client_read_filter); + + NiceMockConnectionStats stats; + server_connection_->setConnectionStats(stats.toBufferStats()); + + Buffer::OwnedImpl data("data"); + server_connection_->write(data, false); + + // Server connection flushes the write and immediately closes the socket. + // There shouldn't be a read/close race here (see issue #2929), since the client is blocked on + // reading and the connection should close gracefully via FIN. + + EXPECT_CALL(stats.delayed_close_timeouts_, inc()).Times(0); + EXPECT_CALL(server_callbacks_, onEvent(ConnectionEvent::LocalClose)).Times(1); + EXPECT_CALL(*client_read_filter, onData(BufferStringEqual("data"), false)) + .Times(1) + .WillOnce(InvokeWithoutArgs([&]() -> FilterStatus { + time_system_.setMonotonicTime(std::chrono::milliseconds(50)); + dispatcher_->exit(); + return FilterStatus::StopIteration; + })); + EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::RemoteClose)).Times(1); + server_connection_->close(ConnectionCloseType::FlushWrite); + dispatcher_->run(Event::Dispatcher::RunType::Block); +} + +// Test that a FlushWrite close will create and enable a timer which closes the connection when +// triggered. +TEST_P(ConnectionImplTest, FlushWriteCloseTimeoutTest) { + ConnectionMocks mocks = createConnectionMocks(); + auto server_connection = std::make_unique( + *mocks.dispatcher, std::make_unique(0, nullptr, nullptr), + std::move(mocks.transport_socket), true); + + InSequence s1; + + // Enable delayed connection close processing by setting a non-zero timeout value. The actual + // value (> 0) doesn't matter since the callback is triggered below. + server_connection->setDelayedCloseTimeout(std::chrono::milliseconds(100)); + + NiceMockConnectionStats stats; + server_connection->setConnectionStats(stats.toBufferStats()); + + Buffer::OwnedImpl data("data"); + server_connection->write(data, false); + + // Data is pending in the write buffer, which will trigger the FlushWrite close to go into delayed + // close processing. + EXPECT_CALL(*mocks.timer, enableTimer(_)).Times(1); + server_connection->close(ConnectionCloseType::FlushWrite); + + EXPECT_CALL(stats.delayed_close_timeouts_, inc()).Times(1); + // Since the callback is being invoked manually, disableTimer() will be called when the connection + // is closed by the callback. + EXPECT_CALL(*mocks.timer, disableTimer()).Times(1); + // Issue the delayed close callback to ensure connection is closed. + mocks.timer->callback_(); +} + +// Test that a FlushWriteAndDelay close causes Envoy to flush the write and wait for the client/peer +// to close (until a configured timeout which is not expected to trigger in this test). +TEST_P(ConnectionImplTest, FlushWriteAndDelayCloseTest) { +#ifdef __APPLE__ + // libevent does not provide early close notifications on the currently supported macOS builds, so + // the server connection is never notified of the close. For now, we have chosen to disable tests + // that rely on this behavior on macOS (see https://github.com/envoyproxy/envoy/pull/4299). + return; +#endif + setUpBasicConnection(); + connect(); + + InSequence s1; + + time_system_.setMonotonicTime(std::chrono::milliseconds(0)); + server_connection_->setDelayedCloseTimeout(std::chrono::milliseconds(100)); + + std::shared_ptr client_read_filter(new NiceMock()); + client_connection_->addReadFilter(client_read_filter); + + NiceMockConnectionStats stats; + server_connection_->setConnectionStats(stats.toBufferStats()); + + Buffer::OwnedImpl data("Connection: Close"); + server_connection_->write(data, false); + + EXPECT_CALL(*client_read_filter, onData(BufferStringEqual("Connection: Close"), false)) + .Times(1) + .WillOnce(InvokeWithoutArgs([&]() -> FilterStatus { + // Advance time by 50ms; delayed close timer should _not_ trigger. + time_system_.setMonotonicTime(std::chrono::milliseconds(50)); + client_connection_->close(ConnectionCloseType::NoFlush); + return FilterStatus::StopIteration; + })); + + // Client closes the connection so delayed close timer on the server conn should not fire. + EXPECT_CALL(stats.delayed_close_timeouts_, inc()).Times(0); + EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::LocalClose)).Times(1); + EXPECT_CALL(server_callbacks_, onEvent(ConnectionEvent::RemoteClose)) + .Times(1) + .WillOnce(Invoke([&](Network::ConnectionEvent) -> void { dispatcher_->exit(); })); + server_connection_->close(ConnectionCloseType::FlushWriteAndDelay); + dispatcher_->run(Event::Dispatcher::RunType::Block); +} + +// Test that a FlushWriteAndDelay close triggers a timeout which forces Envoy to close the +// connection when a client has not issued a close within the configured interval. +TEST_P(ConnectionImplTest, FlushWriteAndDelayCloseTimerTriggerTest) { + setUpBasicConnection(); + connect(); + + InSequence s1; + + // This timer will be forced to trigger by ensuring time advances by >50ms during the test. + server_connection_->setDelayedCloseTimeout(std::chrono::milliseconds(50)); + + std::shared_ptr client_read_filter(new NiceMock()); + client_connection_->addReadFilter(client_read_filter); + + NiceMockConnectionStats stats; + server_connection_->setConnectionStats(stats.toBufferStats()); + + Buffer::OwnedImpl data("Connection: Close"); + server_connection_->write(data, false); + + time_system_.setMonotonicTime(std::chrono::milliseconds(0)); + + // The client _will not_ close the connection. Instead, expect the delayed close timer to trigger + // on the server connection. + EXPECT_CALL(*client_read_filter, onData(BufferStringEqual("Connection: Close"), false)) + .Times(1) + .WillOnce(InvokeWithoutArgs([&]() -> FilterStatus { + time_system_.setMonotonicTime(std::chrono::milliseconds(100)); + return FilterStatus::StopIteration; + })); + server_connection_->close(ConnectionCloseType::FlushWriteAndDelay); + EXPECT_CALL(stats.delayed_close_timeouts_, inc()).Times(1); + EXPECT_CALL(server_callbacks_, onEvent(ConnectionEvent::LocalClose)).Times(1); + EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::RemoteClose)) + .Times(1) + .WillOnce(Invoke([&](Network::ConnectionEvent) -> void { dispatcher_->exit(); })); + dispatcher_->run(Event::Dispatcher::RunType::Block); +} + +// Test that delayed close processing can be disabled by setting the delayed close timeout interval +// to 0. +TEST_P(ConnectionImplTest, FlushWriteAndDelayConfigDisabledTest) { + InSequence s1; + + NiceMock callbacks; + NiceMock dispatcher; + EXPECT_CALL(dispatcher.buffer_factory_, create_(_, _)) + .WillRepeatedly(Invoke([](std::function below_low, + std::function above_high) -> Buffer::Instance* { + return new Buffer::WatermarkBuffer(below_low, above_high); + })); + std::unique_ptr server_connection(new Network::ConnectionImpl( + dispatcher, std::make_unique(0, nullptr, nullptr), + std::make_unique>(), true)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(0)); + + // Ensure the delayed close timer is not created when the delayedCloseTimeout config value is set + // to 0. + server_connection->setDelayedCloseTimeout(std::chrono::milliseconds(0)); + EXPECT_CALL(dispatcher, createTimer_(_)).Times(0); + + NiceMockConnectionStats stats; + server_connection->setConnectionStats(stats.toBufferStats()); + + EXPECT_CALL(stats.delayed_close_timeouts_, inc()).Times(0); + server_connection->close(ConnectionCloseType::FlushWriteAndDelay); + // Advance time by a value larger than the delayed close timeout default (1000ms). This would + // trigger the delayed close timer callback if set. + time_system_.setMonotonicTime(std::chrono::milliseconds(10000)); + + // Since the delayed close timer never triggers, the connection never closes. Close it here to end + // the test cleanly due to the (fd == -1) assert in ~ConnectionImpl(). + server_connection->close(ConnectionCloseType::NoFlush); +} + +// Test that tearing down the connection will disable the delayed close timer. +TEST_P(ConnectionImplTest, DelayedCloseTimeoutDisableOnSocketClose) { + ConnectionMocks mocks = createConnectionMocks(); + auto server_connection = std::make_unique( + *mocks.dispatcher, std::make_unique(0, nullptr, nullptr), + std::move(mocks.transport_socket), true); + + InSequence s1; + + // The actual timeout is insignificant, we just need to enable delayed close processing by setting + // it to > 0. + server_connection->setDelayedCloseTimeout(std::chrono::milliseconds(100)); + + Buffer::OwnedImpl data("data"); + server_connection->write(data, false); + EXPECT_CALL(*mocks.timer, enableTimer(_)).Times(1); + // Enable the delayed close timer. + server_connection->close(ConnectionCloseType::FlushWriteAndDelay); + EXPECT_CALL(*mocks.timer, disableTimer()).Times(1); + // This close() will call closeSocket(), which should disable the timer to avoid triggering it + // after the connection's data structures have been reset. + server_connection->close(ConnectionCloseType::NoFlush); +} + +// Test that the delayed close timeout callback is resilient to connection teardown edge cases. +TEST_P(ConnectionImplTest, DelayedCloseTimeoutNullStats) { + ConnectionMocks mocks = createConnectionMocks(); + auto server_connection = std::make_unique( + *mocks.dispatcher, std::make_unique(0, nullptr, nullptr), + std::move(mocks.transport_socket), true); + + InSequence s1; + + // The actual timeout is insignificant, we just need to enable delayed close processing by setting + // it to > 0. + server_connection->setDelayedCloseTimeout(std::chrono::milliseconds(100)); + + // NOTE: Avoid providing stats storage to the connection via setConnectionStats(). This guarantees + // that connection_stats_ is a nullptr and that the callback resiliency validation below tests + // that edge case. + + Buffer::OwnedImpl data("data"); + server_connection->write(data, false); + + EXPECT_CALL(*mocks.timer, enableTimer(_)).Times(1); + server_connection->close(ConnectionCloseType::FlushWriteAndDelay); + EXPECT_CALL(*mocks.timer, disableTimer()).Times(1); + // Copy the callback since mocks.timer will be freed when closeSocket() is called. + Event::TimerCb callback = mocks.timer->callback_; + // The following close() will call closeSocket() and reset internal data structures such as stats. + server_connection->close(ConnectionCloseType::NoFlush); + // Verify the onDelayedCloseTimeout() callback is resilient to the post closeSocket(), pre + // destruction state. This should not actually happen due to the timeout disablement in + // closeSocket(), but there is enough complexity in connection handling codepaths that being + // extra defensive is valuable. + callback(); +} + class MockTransportConnectionImplTest : public testing::Test { public: MockTransportConnectionImplTest() { diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index f5704c953098..d869a36a7c66 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -13,7 +13,9 @@ #include "gtest/gtest.h" +using ::testing::HasSubstr; using ::testing::MatchesRegex; + namespace Envoy { INSTANTIATE_TEST_CASE_P(IpVersions, Http2IntegrationTest, @@ -390,6 +392,63 @@ TEST_P(Http2IntegrationTest, SimultaneousRequestWithBufferLimits) { simultaneousRequest(1024 * 32, 1024 * 16); } +// Test downstream connection delayed close processing. +TEST_P(Http2IntegrationTest, DelayedCloseAfterBadFrame) { + initialize(); + Buffer::OwnedImpl buffer("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\nhelloworldcauseanerror"); + std::string response; + RawConnectionDriver connection( + lookupPort("http"), buffer, + [&](Network::ClientConnection& connection, const Buffer::Instance& data) -> void { + response.append(data.toString()); + connection.dispatcher().exit(); + }, + version_); + + connection.run(); + EXPECT_THAT(response, HasSubstr("SETTINGS expected")); + // Due to the multiple dispatchers involved (one for the RawConnectionDriver and another for the + // Envoy server), it's possible the delayed close timer could fire and close the server socket + // prior to the data callback above firing. Therefore, we may either still be connected, or have + // received a remote close. + if (connection.last_connection_event() == Network::ConnectionEvent::Connected) { + connection.run(); + } + EXPECT_EQ(connection.last_connection_event(), Network::ConnectionEvent::RemoteClose); + EXPECT_EQ(test_server_->counter("http.config_test.downstream_cx_delayed_close_timeout")->value(), + 1); +} + +// Test disablement of delayed close processing on downstream connections. +TEST_P(Http2IntegrationTest, DelayedCloseDisabled) { + config_helper_.addConfigModifier( + [](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) { + hcm.mutable_delayed_close_timeout()->set_seconds(0); + }); + initialize(); + Buffer::OwnedImpl buffer("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\nhelloworldcauseanerror"); + std::string response; + RawConnectionDriver connection( + lookupPort("http"), buffer, + [&](Network::ClientConnection& connection, const Buffer::Instance& data) -> void { + response.append(data.toString()); + connection.dispatcher().exit(); + }, + version_); + + connection.run(); + EXPECT_THAT(response, HasSubstr("SETTINGS expected")); + // Due to the multiple dispatchers involved (one for the RawConnectionDriver and another for the + // Envoy server), it's possible for the 'connection' to receive the data and exit the dispatcher + // prior to the FIN being received from the server. + if (connection.last_connection_event() == Network::ConnectionEvent::Connected) { + connection.run(); + } + EXPECT_EQ(connection.last_connection_event(), Network::ConnectionEvent::RemoteClose); + EXPECT_EQ(test_server_->counter("http.config_test.downstream_cx_delayed_close_timeout")->value(), + 0); +} + Http2RingHashIntegrationTest::Http2RingHashIntegrationTest() { config_helper_.addConfigModifier([&](envoy::config::bootstrap::v2::Bootstrap& bootstrap) -> void { auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0); diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index 247fe11d7ebb..3b8ac5167308 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -156,18 +156,42 @@ IntegrationCodecClient::startRequest(const Http::HeaderMap& headers) { return {encoder, std::move(response)}; } -void IntegrationCodecClient::waitForDisconnect() { +bool IntegrationCodecClient::waitForDisconnect(std::chrono::milliseconds time_to_wait) { + Event::TimerPtr wait_timer; + bool wait_timer_triggered = false; + if (time_to_wait.count()) { + wait_timer = connection_->dispatcher().createTimer([this, &wait_timer_triggered] { + connection_->dispatcher().exit(); + wait_timer_triggered = true; + }); + wait_timer->enableTimer(time_to_wait); + } + connection_->dispatcher().run(Event::Dispatcher::RunType::Block); + + // Disable the timer if it was created. This call is harmless if the timer already triggered. + if (wait_timer) { + wait_timer->disableTimer(); + } + + if (wait_timer_triggered && !disconnected_) { + return false; + } EXPECT_TRUE(disconnected_); + + return true; } void IntegrationCodecClient::ConnectionCallbacks::onEvent(Network::ConnectionEvent event) { + parent_.last_connection_event_ = event; if (event == Network::ConnectionEvent::Connected) { parent_.connected_ = true; parent_.connection_->dispatcher().exit(); } else if (event == Network::ConnectionEvent::RemoteClose) { parent_.disconnected_ = true; parent_.connection_->dispatcher().exit(); + } else { + parent_.disconnected_ = true; } } diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 9c7bb5d0fb4b..0559715cc784 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -34,8 +34,9 @@ class IntegrationCodecClient : public Http::CodecClientProd { void sendReset(Http::StreamEncoder& encoder); std::pair startRequest(const Http::HeaderMap& headers); - void waitForDisconnect(); + bool waitForDisconnect(std::chrono::milliseconds time_to_wait = std::chrono::milliseconds(0)); Network::ClientConnection* connection() const { return connection_.get(); } + Network::ConnectionEvent last_connection_event() const { return last_connection_event_; } private: struct ConnectionCallbacks : public Network::ConnectionCallbacks { @@ -66,6 +67,7 @@ class IntegrationCodecClient : public Http::CodecClientProd { bool connected_{}; bool disconnected_{}; bool saw_goaway_{}; + Network::ConnectionEvent last_connection_event_; }; typedef std::unique_ptr IntegrationCodecClientPtr; diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index 730bdb25dc4e..fd1050cce4ea 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -373,4 +373,112 @@ TEST_P(IntegrationTest, ViaAppendWith100Continue) { config_helper_.addConfigModifier(setVia("foo")); } +// Test delayed close semantics for downstream HTTP/1.1 connections. When an early response is +// sent by Envoy, it will wait for response acknowledgment (via FIN/RST) from the client before +// closing the socket (with a timeout for ensuring cleanup). +TEST_P(IntegrationTest, TestDelayedConnectionTeardownOnGracefulClose) { + // This test will trigger an early 413 Payload Too Large response due to buffer limits being + // exceeded. The following filter is needed since the router filter will never trigger a 413. + config_helper_.addFilter("{ name: envoy.http_dynamo_filter, config: {} }"); + config_helper_.setBufferLimits(1024, 1024); + initialize(); + + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto encoder_decoder = + codec_client_->startRequest(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + + codec_client_->sendData(*request_encoder_, 1024 * 65, false); + + response->waitForEndStream(); + EXPECT_TRUE(response->complete()); + EXPECT_STREQ("413", response->headers().Status()->value().c_str()); + // With no delayed close processing, Envoy will close the connection immediately after flushing + // and this should instead return true. + EXPECT_FALSE(codec_client_->waitForDisconnect(std::chrono::milliseconds(500))); + + // Issue a local close and check that the client did not pick up a remote close which can happen + // when delayed close semantics are disabled. + codec_client_->connection()->close(Network::ConnectionCloseType::NoFlush); + EXPECT_EQ(codec_client_->last_connection_event(), Network::ConnectionEvent::LocalClose); +} + +// Test configuration of the delayed close timeout on downstream HTTP/1.1 connections. A value of 0 +// disables delayed close processing. +TEST_P(IntegrationTest, TestDelayedConnectionTeardownConfig) { + config_helper_.addFilter("{ name: envoy.http_dynamo_filter, config: {} }"); + config_helper_.setBufferLimits(1024, 1024); + config_helper_.addConfigModifier( + [](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) { + hcm.mutable_delayed_close_timeout()->set_seconds(0); + }); + initialize(); + + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto encoder_decoder = + codec_client_->startRequest(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + + codec_client_->sendData(*request_encoder_, 1024 * 65, false); + + response->waitForEndStream(); + // There is a potential race in the client's response processing when delayed close logic is + // disabled in Envoy (see https://github.com/envoyproxy/envoy/issues/2929). Depending on timing, + // a client may receive an RST prior to reading the response data from the socket, which may clear + // the receive buffers. Also, clients which don't flush the receive buffer upon receiving a remote + // close may also lose data (Envoy is susceptible to this). + // Therefore, avoid checking response code/payload here and instead simply look for the remote + // close. + EXPECT_TRUE(codec_client_->waitForDisconnect(std::chrono::milliseconds(500))); + EXPECT_EQ(codec_client_->last_connection_event(), Network::ConnectionEvent::RemoteClose); +} + +// Test that delay closed connections are eventually force closed when the timeout triggers. +TEST_P(IntegrationTest, TestDelayedConnectionTeardownTimeoutTrigger) { + config_helper_.addFilter("{ name: envoy.http_dynamo_filter, config: {} }"); + config_helper_.setBufferLimits(1024, 1024); + config_helper_.addConfigModifier( + [](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) { + // 200ms. + hcm.mutable_delayed_close_timeout()->set_nanos(200000000); + }); + + initialize(); + + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto encoder_decoder = + codec_client_->startRequest(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + + codec_client_->sendData(*request_encoder_, 1024 * 65, false); + + response->waitForEndStream(); + // The delayed close timeout should trigger since client is not closing the connection. + EXPECT_TRUE(codec_client_->waitForDisconnect(std::chrono::milliseconds(2000))); + EXPECT_EQ(codec_client_->last_connection_event(), Network::ConnectionEvent::RemoteClose); + EXPECT_EQ(test_server_->counter("http.config_test.downstream_cx_delayed_close_timeout")->value(), + 1); +} + } // namespace Envoy diff --git a/test/integration/utility.h b/test/integration/utility.h index 04e5d650eebb..adc290311641 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -65,6 +65,9 @@ class RawConnectionDriver { bool connecting() { return callbacks_->connecting_; } void run(Event::Dispatcher::RunType run_type = Event::Dispatcher::RunType::Block); void close(); + Network::ConnectionEvent last_connection_event() const { + return callbacks_->last_connection_event_; + } private: struct ForwardingFilter : public Network::ReadFilterBaseImpl { @@ -83,11 +86,15 @@ class RawConnectionDriver { }; struct ConnectionCallbacks : public Network::ConnectionCallbacks { - void onEvent(Network::ConnectionEvent) override { connecting_ = false; } + void onEvent(Network::ConnectionEvent event) override { + last_connection_event_ = event; + connecting_ = false; + } void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} bool connecting_{true}; + Network::ConnectionEvent last_connection_event_; }; Api::ApiPtr api_; diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 110608479a40..a7b8303d40b5 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -90,6 +90,8 @@ class MockConnection : public Connection, public MockConnectionBase { MOCK_CONST_METHOD0(socketOptions, const Network::ConnectionSocket::OptionsSharedPtr&()); MOCK_METHOD0(perConnectionState, RequestInfo::FilterState&()); MOCK_CONST_METHOD0(perConnectionState, const RequestInfo::FilterState&()); + MOCK_METHOD1(setDelayedCloseTimeout, void(std::chrono::milliseconds)); + MOCK_CONST_METHOD0(delayedCloseTimeout, std::chrono::milliseconds()); }; /** @@ -131,6 +133,8 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase MOCK_CONST_METHOD0(socketOptions, const Network::ConnectionSocket::OptionsSharedPtr&()); MOCK_METHOD0(perConnectionState, RequestInfo::FilterState&()); MOCK_CONST_METHOD0(perConnectionState, const RequestInfo::FilterState&()); + MOCK_METHOD1(setDelayedCloseTimeout, void(std::chrono::milliseconds)); + MOCK_CONST_METHOD0(delayedCloseTimeout, std::chrono::milliseconds()); // Network::ClientConnection MOCK_METHOD0(connect, void());