Skip to content

Commit

Permalink
network: reintroduce #4382 (delayed conn close) with segv fix (#4587)
Browse files Browse the repository at this point in the history
Re-enable the changes reverted in 9d32e5c, which were originally merged as part of #4382.

Signed-off-by: Andres Guedez <[email protected]>
  • Loading branch information
AndresGuedez authored and mattklein123 committed Oct 4, 2018
1 parent 0f42fd6 commit b16f529
Show file tree
Hide file tree
Showing 26 changed files with 639 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ message HttpConnectionManager {
// option is not specified.
google.protobuf.Duration drain_timeout = 12 [(gogoproto.stdduration) = true];

// [#not-implemented-hide:]
// The delayed close timeout is for downstream connections managed by the HTTP connection manager.
// It is defined as a grace period after connection close processing has been locally initiated
// during which Envoy will flush the write buffers for the connection and await the peer to close
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Version history
dynamic table size of both: encoder and decoder.
* http: added support for removing request headers using :ref:`request_headers_to_remove
<envoy_api_field_route.Route.request_headers_to_remove>`.
* http: added support for a :ref:`delayed close timeout<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.delayed_close_timeout>` to mitigate race conditions when closing connections to downstream HTTP clients. The timeout defaults to 1 second.
* jwt-authn filter: add support for per route JWT requirements.
* listeners: added the ability to match :ref:`FilterChain <envoy_api_msg_listener.FilterChain>` using
:ref:`destination_port <envoy_api_field_listener.FilterChainMatch.destination_port>` and
Expand Down
17 changes: 16 additions & 1 deletion include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class ConnectionCallbacks {
*/
enum class ConnectionCloseType {
FlushWrite, // Flush pending write data before raising ConnectionEvent::LocalClose
NoFlush // Do not flush any pending data and immediately raise ConnectionEvent::LocalClose
NoFlush, // Do not flush any pending data and immediately raise ConnectionEvent::LocalClose
FlushWriteAndDelay // Flush pending write data and delay raising a ConnectionEvent::LocalClose
// until the delayed_close_timeout expires
};

/**
Expand All @@ -87,6 +89,8 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
Stats::Gauge& write_current_;
// Counter* as this is an optional counter. Bind errors will not be tracked if this is nullptr.
Stats::Counter* bind_errors_;
// Optional counter. Delayed close timeouts will not be tracked if this is nullptr.
Stats::Counter* delayed_close_timeouts_;
};

virtual ~Connection() {}
Expand Down Expand Up @@ -243,6 +247,17 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
*/
virtual RequestInfo::FilterState& perConnectionState() PURE;
virtual const RequestInfo::FilterState& perConnectionState() const PURE;

/**
* Set the timeout for delayed connection close()s.
* @param timeout The timeout value in milliseconds
*/
virtual void setDelayedCloseTimeout(std::chrono::milliseconds timeout) PURE;

/**
* @return std::chrono::milliseconds The delayed close timeout value.
*/
virtual std::chrono::milliseconds delayedCloseTimeout() const PURE;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
Expand Down
7 changes: 7 additions & 0 deletions source/common/http/conn_manager_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace Http {
COUNTER (downstream_cx_drain_close) \
COUNTER (downstream_cx_idle_timeout) \
COUNTER (downstream_cx_overload_disable_keepalive) \
COUNTER (downstream_cx_delayed_close_timeout) \
COUNTER (downstream_flow_control_paused_reading_total) \
COUNTER (downstream_flow_control_resumed_reading_total) \
COUNTER (downstream_rq_total) \
Expand Down Expand Up @@ -225,6 +226,12 @@ class ConnectionManagerConfig {
*/
virtual std::chrono::milliseconds streamIdleTimeout() const PURE;

/**
* @return delayed close timeout for downstream HTTP connections. Zero indicates a disabled
* timeout. See http_connection_manager.proto for a detailed description of this timeout.
*/
virtual std::chrono::milliseconds delayedCloseTimeout() const PURE;

/**
* @return Router::RouteConfigProvider& the configuration provider used to acquire a route
* config for each request flow.
Expand Down
16 changes: 10 additions & 6 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCal
idle_timer_->enableTimer(config_.idleTimeout().value());
}

read_callbacks_->connection().setDelayedCloseTimeout(config_.delayedCloseTimeout());

read_callbacks_->connection().setConnectionStats(
{stats_.named_.downstream_cx_rx_bytes_total_, stats_.named_.downstream_cx_rx_bytes_buffered_,
stats_.named_.downstream_cx_tx_bytes_total_, stats_.named_.downstream_cx_tx_bytes_buffered_,
nullptr});
nullptr, &stats_.named_.downstream_cx_delayed_close_timeout_});
}

ConnectionManagerImpl::~ConnectionManagerImpl() {
Expand Down Expand Up @@ -129,7 +131,7 @@ ConnectionManagerImpl::~ConnectionManagerImpl() {

void ConnectionManagerImpl::checkForDeferredClose() {
if (drain_state_ == DrainState::Closing && streams_.empty() && !codec_->wantsToWrite()) {
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWriteAndDelay);
}
}

Expand Down Expand Up @@ -244,12 +246,12 @@ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool
ENVOY_CONN_LOG(debug, "dispatch error: {}", read_callbacks_->connection(), e.what());
stats_.named_.downstream_cx_protocol_error_.inc();

// In the protocol error case, we need to reset all streams now. Since we do a flush write,
// the connection might stick around long enough for a pending stream to come back and try
// to encode.
// In the protocol error case, we need to reset all streams now. Since we do a flush write and
// delayed close, the connection might stick around long enough for a pending stream to come
// back and try to encode.
resetAllStreams();

read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWriteAndDelay);
return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -328,6 +330,8 @@ void ConnectionManagerImpl::onIdleTimeout() {
ENVOY_CONN_LOG(debug, "idle timeout", read_callbacks_->connection());
stats_.named_.downstream_cx_idle_timeout_.inc();
if (!codec_) {
// No need to delay close after flushing since an idle timeout has already fired. Attempt to
// write out buffered data one last time and issue a local close if successful.
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
} else if (drain_state_ == DrainState::NotDraining) {
startDrainSequence();
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent)
parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_,
&parent_.host_->cluster().stats().bind_errors_});
&parent_.host_->cluster().stats().bind_errors_, nullptr});
}

ConnPoolImpl::ActiveClient::~ActiveClient() {
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http2/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent)
parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_,
&parent_.host_->cluster().stats().bind_errors_});
&parent_.host_->cluster().stats().bind_errors_, nullptr});
}

ConnPoolImpl::ActiveClient::~ActiveClient() {
Expand Down
68 changes: 62 additions & 6 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
}

ConnectionImpl::~ConnectionImpl() {
ASSERT(fd() == -1, "ConnectionImpl was unexpectedly torn down without being closed.");
ASSERT(fd() == -1 && delayed_close_timer_ == nullptr,
"ConnectionImpl was unexpectedly torn down without being closed.");

// In general we assume that owning code has called close() previously to the destructor being
// run. This generally must be done so that callbacks run in the correct context (vs. deferred
Expand Down Expand Up @@ -106,10 +107,51 @@ void ConnectionImpl::close(ConnectionCloseType type) {

closeSocket(ConnectionEvent::LocalClose);
} else {
// TODO(mattklein123): We need a flush timer here. We might never get open socket window.
ASSERT(type == ConnectionCloseType::FlushWrite);
close_with_flush_ = true;
ASSERT(type == ConnectionCloseType::FlushWrite ||
type == ConnectionCloseType::FlushWriteAndDelay);

// No need to continue if a FlushWrite/FlushWriteAndDelay has already been issued and there is a
// pending delayed close.
//
// An example of this condition manifests when a downstream connection is closed early by Envoy,
// such as when a route can't be matched:
// In ConnectionManagerImpl::onData()
// 1) Via codec_->dispatch(), a local reply with a 404 is sent to the client
// a) ConnectionManagerImpl::doEndStream() issues the first connection close() via
// ConnectionManagerImpl::checkForDeferredClose()
// 2) A second close is issued by a subsequent call to
// ConnectionManagerImpl::checkForDeferredClose() prior to returning from onData()
if (delayed_close_) {
return;
}

delayed_close_ = true;
const bool delayed_close_timeout_set = delayedCloseTimeout().count() > 0;

// NOTE: the delayed close timeout (if set) affects both FlushWrite and FlushWriteAndDelay
// closes:
// 1. For FlushWrite, the timeout sets an upper bound on how long to wait for the flush to
// complete before the connection is locally closed.
// 2. For FlushWriteAndDelay, the timeout specifies an upper bound on how long to wait for the
// flush to complete and the peer to close the connection before it is locally closed.

// All close types that follow do not actually close() the socket immediately so that buffered
// data can be written. However, we do want to stop reading to apply TCP backpressure.
read_enabled_ = false;

// Force a closeSocket() after the write buffer is flushed if the close_type calls for it or if
// no delayed close timeout is set.
close_after_flush_ = !delayed_close_timeout_set || type == ConnectionCloseType::FlushWrite;

// Create and activate a timer which will immediately close the connection if triggered.
// A config value of 0 disables the timeout.
if (delayed_close_timeout_set) {
delayed_close_timer_ = dispatcher_.createTimer([this]() -> void { onDelayedCloseTimeout(); });
ENVOY_CONN_LOG(debug, "setting delayed close timer with timeout {} ms", *this,
delayedCloseTimeout().count());
delayed_close_timer_->enableTimer(delayedCloseTimeout());
}

file_event_->setEnabled(Event::FileReadyType::Write |
(enable_half_close_ ? 0 : Event::FileReadyType::Closed));
}
Expand All @@ -118,7 +160,7 @@ void ConnectionImpl::close(ConnectionCloseType type) {
Connection::State ConnectionImpl::state() const {
if (fd() == -1) {
return State::Closed;
} else if (close_with_flush_) {
} else if (delayed_close_) {
return State::Closing;
} else {
return State::Open;
Expand All @@ -130,6 +172,12 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
return;
}

// No need for a delayed close (if pending) now that the socket is being closed.
if (delayed_close_timer_) {
delayed_close_timer_->disableTimer();
delayed_close_timer_ = nullptr;
}

ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast<uint32_t>(close_type));
transport_socket_->closeSocket(close_type);

Expand Down Expand Up @@ -488,7 +536,7 @@ void ConnectionImpl::onWriteReady() {
// write callback. This can happen if we manage to complete the SSL handshake in the write
// callback, raise a connected event, and close the connection.
closeSocket(ConnectionEvent::RemoteClose);
} else if ((close_with_flush_ && new_buffer_size == 0) || bothSidesHalfClosed()) {
} else if ((close_after_flush_ && new_buffer_size == 0) || bothSidesHalfClosed()) {
ENVOY_CONN_LOG(debug, "write flush complete", *this);
closeSocket(ConnectionEvent::LocalClose);
} else if (result.action_ == PostIoAction::KeepOpen && result.bytes_processed_ > 0) {
Expand Down Expand Up @@ -535,6 +583,14 @@ bool ConnectionImpl::bothSidesHalfClosed() {
return read_end_stream_ && write_end_stream_ && write_buffer_->length() == 0;
}

void ConnectionImpl::onDelayedCloseTimeout() {
ENVOY_CONN_LOG(debug, "triggered delayed close", *this);
if (connection_stats_ != nullptr && connection_stats_->delayed_close_timeouts_ != nullptr) {
connection_stats_->delayed_close_timeouts_->inc();
}
closeSocket(ConnectionEvent::LocalClose);
}

ClientConnectionImpl::ClientConnectionImpl(
Event::Dispatcher& dispatcher, const Address::InstanceConstSharedPtr& remote_address,
const Network::Address::InstanceConstSharedPtr& source_address,
Expand Down
13 changes: 12 additions & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class ConnectionImpl : public virtual Connection,
// Obtain global next connection ID. This should only be used in tests.
static uint64_t nextGlobalIdForTest() { return next_global_id_; }

void setDelayedCloseTimeout(std::chrono::milliseconds timeout) override {
delayed_close_timeout_ = timeout;
}
std::chrono::milliseconds delayedCloseTimeout() const override { return delayed_close_timeout_; }

protected:
void closeSocket(ConnectionEvent close_type);

Expand All @@ -137,6 +142,7 @@ class ConnectionImpl : public virtual Connection,
// a generic pointer.
Buffer::InstancePtr write_buffer_;
uint32_t read_buffer_limit_ = 0;
std::chrono::milliseconds delayed_close_timeout_{0};

protected:
bool connecting_{false};
Expand All @@ -157,14 +163,19 @@ class ConnectionImpl : public virtual Connection,
// Returns true iff end of stream has been both written and read.
bool bothSidesHalfClosed();

// Callback issued when a delayed close timeout triggers.
void onDelayedCloseTimeout();

static std::atomic<uint64_t> next_global_id_;

Event::Dispatcher& dispatcher_;
const uint64_t id_;
Event::TimerPtr delayed_close_timer_;
std::list<ConnectionCallbacks*> callbacks_;
std::list<BytesSentCb> bytes_sent_callbacks_;
bool read_enabled_{true};
bool close_with_flush_{false};
bool close_after_flush_{false};
bool delayed_close_{false};
bool above_high_watermark_{false};
bool detect_early_close_{true};
bool enable_half_close_{false};
Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent)
parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_,
&parent_.host_->cluster().stats().bind_errors_});
&parent_.host_->cluster().stats().bind_errors_, nullptr});

// We just universally set no delay on connections. Theoretically we might at some point want
// to make this configurable.
Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec
{config_->stats().downstream_cx_rx_bytes_total_,
config_->stats().downstream_cx_rx_bytes_buffered_,
config_->stats().downstream_cx_tx_bytes_total_,
config_->stats().downstream_cx_tx_bytes_buffered_, nullptr});
config_->stats().downstream_cx_tx_bytes_buffered_, nullptr, nullptr});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ HttpConnectionManagerConfig::HttpConnectionManagerConfig(
date_provider_(date_provider),
listener_stats_(Http::ConnectionManagerImpl::generateListenerStats(stats_prefix_,
context_.listenerScope())),
proxy_100_continue_(config.proxy_100_continue()) {
proxy_100_continue_(config.proxy_100_continue()),
delayed_close_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, delayed_close_timeout, 1000)) {

route_config_provider_ = Router::RouteConfigProviderUtil::create(config, context_, stats_prefix_,
route_config_provider_manager_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class HttpConnectionManagerConfig : Logger::Loggable<Logger::Id::config>,
Http::ConnectionManagerListenerStats& listenerStats() override { return listener_stats_; }
bool proxy100Continue() const override { return proxy_100_continue_; }
const Http::Http1Settings& http1Settings() const override { return http1_settings_; }
std::chrono::milliseconds delayedCloseTimeout() const override { return delayed_close_timeout_; }

private:
typedef std::list<Http::FilterFactoryCb> FilterFactoriesList;
Expand Down Expand Up @@ -170,6 +171,7 @@ class HttpConnectionManagerConfig : Logger::Loggable<Logger::Id::config>,
Http::DateProvider& date_provider_;
Http::ConnectionManagerListenerStats listener_stats_;
const bool proxy_100_continue_;
std::chrono::milliseconds delayed_close_timeout_;

// Default idle timeout is 5 minutes if nothing is specified in the HCM config.
static const uint64_t StreamIdleTimeoutMs = 5 * 60 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void ProxyFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& ca
config_->stats_.downstream_cx_rx_bytes_buffered_,
config_->stats_.downstream_cx_tx_bytes_total_,
config_->stats_.downstream_cx_tx_bytes_buffered_,
nullptr});
nullptr, nullptr});
}

void ProxyFilter::onRespValue(RespValuePtr&& value) {
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/stat_sinks/common/statsd/statsd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void TcpStatsdSink::TlsSink::write(Buffer::Instance& buffer) {
parent_.cluster_info_->stats().upstream_cx_rx_bytes_buffered_,
parent_.cluster_info_->stats().upstream_cx_tx_bytes_total_,
parent_.cluster_info_->stats().upstream_cx_tx_bytes_buffered_,
&parent_.cluster_info_->stats().bind_errors_});
&parent_.cluster_info_->stats().bind_errors_, nullptr});
connection_->connect();
}

Expand Down
1 change: 1 addition & 0 deletions source/server/http/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class AdminImpl : public Admin,
bool generateRequestId() override { return false; }
absl::optional<std::chrono::milliseconds> idleTimeout() const override { return idle_timeout_; }
std::chrono::milliseconds streamIdleTimeout() const override { return {}; }
std::chrono::milliseconds delayedCloseTimeout() const override { return {}; }
Router::RouteConfigProvider& routeConfigProvider() override { return route_config_provider_; }
const std::string& serverName() override { return Http::DefaultServerString::get(); }
Http::ConnectionManagerStats& stats() override { return stats_; }
Expand Down
Loading

0 comments on commit b16f529

Please sign in to comment.