Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp_proxy: implement idle timeout and some stats #8999

Merged
merged 12 commits into from
Nov 25, 2019
Merged
12 changes: 10 additions & 2 deletions api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@ import "validate/validate.proto";

// TODO(mattklein123): docs

// Configuration for the UDP proxy filter.
message UdpProxyConfig {
oneof cluster_specifier {
// The stat prefix used when emitting UDP proxy filter stats.
string stat_prefix = 1 [(validate.rules).string = {min_bytes: 1}];

oneof route_specifier {
option (validate.required) = true;

// The upstream cluster to connect to.
string cluster = 1 [(validate.rules).string = {min_bytes: 1}];
string cluster = 2 [(validate.rules).string = {min_bytes: 1}];
}

// The idle timeout for sessions. Idle is defined as no datagrams between received or sent by
// the session. The default if not specified is 1 minute.
google.protobuf.Duration idle_timeout = 3;
}
7 changes: 7 additions & 0 deletions include/envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ class UdpListenerReadFilter {
*/
virtual void onData(UdpRecvData& data) PURE;

/**
* Called when there is an error event in the receive data path.
*
* @param error_code supplies the received error on the listener.
*/
virtual void onReceiveError(Api::IoError::IoErrorCode error_code) PURE;

protected:
/**
* @param callbacks supplies the read filter callbacks used to interact with the filter manager.
Expand Down
7 changes: 2 additions & 5 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ struct UdpSendData {
*/
class UdpListenerCallbacks {
public:
enum class ErrorCode { SyscallError, UnknownError };

virtual ~UdpListenerCallbacks() = default;

/**
Expand All @@ -225,10 +223,9 @@ class UdpListenerCallbacks {
* Called when there is an error event in the receive data path.
* The send side error is a return type on the send method.
*
* @param error_code ErrorCode for the error event.
* @param error_number System error number.
* @param error_code supplies the received error on the listener.
*/
virtual void onReceiveError(const ErrorCode& error_code, Api::IoError::IoErrorCode err) PURE;
virtual void onReceiveError(Api::IoError::IoErrorCode error_code) PURE;
};

/**
Expand Down
6 changes: 4 additions & 2 deletions source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ void UdpListenerImpl::handleReadCallback() {
socket_->ioHandle(), *socket_->localAddress(), *this, time_source_, packets_dropped_);
// TODO(mattklein123): Handle no error when we limit the number of packets read.
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast<int>(result->getErrorCode()),
// TODO(mattklein123): When rate limited logging is implemented log this at error level
// on a periodic basis.
ENVOY_UDP_LOG(debug, "recvmsg result {}: {}", static_cast<int>(result->getErrorCode()),
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
result->getErrorDetails());
cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError, result->getErrorCode());
cb_.onReceiveError(result->getErrorCode());
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlic
send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt);

if (send_result.ok()) {
ENVOY_LOG_MISC(trace, "sendmsg sent:{} bytes", send_result.rc_);
ENVOY_LOG_MISC(trace, "sendmsg bytes {}", send_result.rc_);
} else {
ENVOY_LOG_MISC(debug, "sendmsg failed with error code {}: {}",
static_cast<int>(send_result.err_->getErrorCode()),
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/udp/udp_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class UdpProxyFilterConfigFactory
createFilterFactoryFromProto(const Protobuf::Message& config,
Server::Configuration::ListenerFactoryContext& context) override {
auto shared_config = std::make_shared<UdpProxyFilterConfig>(
context.clusterManager(), context.timeSource(),
context.clusterManager(), context.timeSource(), context.scope(),
MessageUtil::downcastAndValidate<
const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig&>(
config, context.messageValidationVisitor()));
Expand Down
67 changes: 52 additions & 15 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ namespace Extensions {
namespace UdpFilters {
namespace UdpProxy {

// TODO(mattklein123): Logging
// TODO(mattklein123): Stats

void UdpProxyFilter::onData(Network::UdpRecvData& data) {
const auto active_session_it = sessions_.find(data.addresses_);
ActiveSession* active_session;
Expand All @@ -18,7 +15,7 @@ void UdpProxyFilter::onData(Network::UdpRecvData& data) {
// TODO(mattklein123): Instead of looking up the cluster each time, keep track of it via
// cluster manager callbacks.
Upstream::ThreadLocalCluster* cluster = config_->getCluster();
// TODO(mattklein123): Handle the case where the cluster does not exist.
// TODO(mattklein123): Handle the case where the cluster does not exist and add stat.
ASSERT(cluster != nullptr);

// TODO(mattklein123): Pass a context and support hash based routing.
Expand All @@ -37,18 +34,26 @@ void UdpProxyFilter::onData(Network::UdpRecvData& data) {
active_session->write(*data.buffer_);
}

void UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) {
config_->stats().downstream_sess_rx_errors_.inc();
}

UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent,
Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host)
: parent_(parent), addresses_(std::move(addresses)), host_(host),
idle_timer_(parent.read_callbacks_->udpListener().dispatcher().createTimer(
[this] { onIdleTimer(); })),
// NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port
// is bound until the first packet is sent to the upstream host.
io_handle_(host->address()->socket(Network::Address::SocketType::Datagram)),
io_handle_(parent.createIoHandle(host)),
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
socket_event_(parent.read_callbacks_->udpListener().dispatcher().createFileEvent(
io_handle_->fd(), [this](uint32_t) { onReadReady(); }, Event::FileTriggerType::Edge,
Event::FileReadyType::Read)) {
ENVOY_LOG(debug, "creating new session: downstream={} local={}", addresses_.peer_->asStringView(),
addresses_.local_->asStringView());
parent_.config_->stats().downstream_sess_total_.inc();
parent_.config_->stats().downstream_sess_active_.inc();

// TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction
// does not work well right now for client sockets. It's too heavy weight and is aimed at listener
Expand All @@ -57,39 +62,71 @@ UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent,
// handle.
}

UdpProxyFilter::ActiveSession::~ActiveSession() {
parent_.config_->stats().downstream_sess_active_.dec();
}

void UdpProxyFilter::ActiveSession::onIdleTimer() {
ENVOY_LOG(debug, "session idle timeout: downstream={} local={}", addresses_.peer_->asStringView(),
addresses_.local_->asStringView());
parent_.config_->stats().idle_timeout_.inc();
parent_.sessions_.erase(addresses_);
}

void UdpProxyFilter::ActiveSession::onReadReady() {
// TODO(mattklein123): Refresh idle timer.
idle_timer_->enableTimer(parent_.config_->sessionTimeout());

// TODO(mattklein123): We should not be passing *addresses_.local_ to this function as we are
// not trying to populate the local address for received packets.
uint32_t packets_dropped = 0;
const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket(
*io_handle_, *addresses_.local_, *this, parent_.config_->timeSource(), packets_dropped);
// TODO(mattklein123): Handle no error when we limit the number of packets read.
// TODO(mattklein123): Increment stat on failure.
ASSERT(result->getErrorCode() == Api::IoError::IoErrorCode::Again);
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
// TODO(mattklein123): Upstream cluster RX error stat.
}
}

void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) {
ENVOY_LOG(trace, "writing {} byte datagram: downstream={} local={} upstream={}", buffer.length(),
addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
ENVOY_LOG(trace, "writing {} byte datagram upstream: downstream={} local={} upstream={}",
buffer.length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
parent_.config_->stats().downstream_sess_rx_bytes_.add(buffer.length());
parent_.config_->stats().downstream_sess_rx_datagrams_.inc();

idle_timer_->enableTimer(parent_.config_->sessionTimeout());
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved

// TODO(mattklein123): Refresh idle timer.
// NOTE: On the first write, a local ephemeral port is bound, and thus this write can fail due to
// port exhaustion.
// NOTE: We do not specify the local IP to use for the sendmsg call. We allow the OS to select
// the right IP based on outbound routing rules.
Api::IoCallUint64Result rc =
Network::Utility::writeToSocket(*io_handle_, buffer, nullptr, *host_->address());
// TODO(mattklein123): Increment stat on failure.
ASSERT(rc.ok());
if (!rc.ok()) {
// TODO(mattklein123): Upstream cluster TX error stat.
} else {
// TODO(mattklein123): Upstream cluster TX byte/datagram stats.
}
}

void UdpProxyFilter::ActiveSession::processPacket(Network::Address::InstanceConstSharedPtr,
Network::Address::InstanceConstSharedPtr,
Buffer::InstancePtr buffer, MonotonicTime) {
ENVOY_LOG(trace, "writing {} byte datagram downstream: downstream={} local={} upstream={}",
buffer->length(), addresses_.peer_->asStringView(), addresses_.local_->asStringView(),
host_->address()->asStringView());
const uint64_t buffer_length = buffer->length();

// TODO(mattklein123): Upstream cluster RX byte/datagram stats.

Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *buffer};
const Api::IoCallUint64Result rc = parent_.read_callbacks_->udpListener().send(data);
// TODO(mattklein123): Increment stat on failure.
ASSERT(rc.ok());
if (!rc.ok()) {
parent_.config_->stats().downstream_sess_tx_errors_.inc();
} else {
parent_.config_->stats().downstream_sess_tx_bytes_.add(buffer_length);
parent_.config_->stats().downstream_sess_tx_datagrams_.inc();
}
}

} // namespace UdpProxy
Expand Down
56 changes: 51 additions & 5 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,53 @@ namespace Extensions {
namespace UdpFilters {
namespace UdpProxy {

/**
* All UDP proxy stats. @see stats_macros.h
*/
#define ALL_UDP_PROXY_STATS(COUNTER, GAUGE) \
COUNTER(downstream_sess_rx_bytes) \
COUNTER(downstream_sess_rx_datagrams) \
COUNTER(downstream_sess_rx_errors) \
COUNTER(downstream_sess_total) \
COUNTER(downstream_sess_tx_bytes) \
COUNTER(downstream_sess_tx_datagrams) \
COUNTER(downstream_sess_tx_errors) \
COUNTER(idle_timeout) \
GAUGE(downstream_sess_active, Accumulate)

/**
* Struct definition for all UDP proxy stats. @see stats_macros.h
*/
struct UdpProxyStats {
ALL_UDP_PROXY_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

class UdpProxyFilterConfig {
public:
UdpProxyFilterConfig(Upstream::ClusterManager& cluster_manager, TimeSource& time_source,
Stats::Scope& root_scope,
const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig& config)
: cluster_manager_(cluster_manager), time_source_(time_source), config_(config) {}
: cluster_manager_(cluster_manager), time_source_(time_source), cluster_(config.cluster()),
session_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, idle_timeout, 60 * 1000)),
stats_(generateStats(config.stat_prefix(), root_scope)) {}

Upstream::ThreadLocalCluster* getCluster() const {
return cluster_manager_.get(config_.cluster());
}
Upstream::ThreadLocalCluster* getCluster() const { return cluster_manager_.get(cluster_); }
std::chrono::milliseconds sessionTimeout() const { return session_timeout_; }
UdpProxyStats& stats() const { return stats_; }
TimeSource& timeSource() const { return time_source_; }

private:
static UdpProxyStats generateStats(const std::string& stat_prefix, Stats::Scope& scope) {
const auto final_prefix = fmt::format("udp.{}", stat_prefix);
return {ALL_UDP_PROXY_STATS(POOL_COUNTER_PREFIX(scope, final_prefix),
POOL_GAUGE_PREFIX(scope, final_prefix))};
}

Upstream::ClusterManager& cluster_manager_;
TimeSource& time_source_;
const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig config_;
const std::string cluster_;
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
const std::chrono::milliseconds session_timeout_;
mutable UdpProxyStats stats_;
};

using UdpProxyFilterConfigSharedPtr = std::shared_ptr<const UdpProxyFilterConfig>;
Expand All @@ -42,6 +74,7 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable<L

// Network::UdpListenerReadFilter
void onData(Network::UdpRecvData& data) override;
void onReceiveError(Api::IoError::IoErrorCode error_code) override;

private:
/**
Expand All @@ -56,10 +89,12 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable<L
public:
ActiveSession(UdpProxyFilter& parent, Network::UdpRecvData::LocalPeerAddresses&& addresses,
const Upstream::HostConstSharedPtr& host);
~ActiveSession();
const Network::UdpRecvData::LocalPeerAddresses& addresses() { return addresses_; }
void write(const Buffer::Instance& buffer);

private:
void onIdleTimer();
void onReadReady();

// Network::UdpPacketProcessor
Expand All @@ -76,6 +111,12 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable<L
UdpProxyFilter& parent_;
const Network::UdpRecvData::LocalPeerAddresses addresses_;
const Upstream::HostConstSharedPtr host_;
// TODO(mattklein123): Consider replacing an idle timer for each session with a last used
// time stamp and a periodic scan of all sessions to look for timeouts. This solution is simple,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might consider to use linked hash map to hold all the sessions in that case.
It can also be one alarm per session, but scheduled once per loop instead of once per packet. Not necessarily to be a common idle timer cross all sessions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed there is a bunch we can do. Let's tackle this in a follow up. Thanks for the good discussion. :)

// though it might not perform well for high volume traffic. Note that this is how TCP proxy
// idle timeouts work so we should consider unifying the implementation if we move to a time
// stamp and scan approach.
const Event::TimerPtr idle_timer_;
// The IO handle is used for writing packets to the selected upstream host as well as receiving
// packets from the upstream host. Note that a a local ephemeral port is bound on the first
// write to the upstream host.
Expand Down Expand Up @@ -116,6 +157,11 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable<L
}
};

virtual Network::IoHandlePtr createIoHandle(const Upstream::HostConstSharedPtr& host) {
// Virtual so this can be overridden in unit tests.
return host->address()->socket(Network::Address::SocketType::Datagram);
}

const UdpProxyFilterConfigSharedPtr config_;
absl::flat_hash_set<ActiveSessionPtr, HeterogeneousActiveSessionHash,
HeterogeneousActiveSessionEqual>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class ActiveQuicListener : public Network::UdpListenerCallbacks,
// Network::UdpListenerCallbacks
void onData(Network::UdpRecvData& data) override;
void onWriteReady(const Network::Socket& socket) override;
void onReceiveError(const Network::UdpListenerCallbacks::ErrorCode& /*error_code*/,
Api::IoError::IoErrorCode /*err*/) override {
void onReceiveError(Api::IoError::IoErrorCode /*error_code*/) override {
// No-op. Quic can't do anything upon listener error.
}

Expand Down
7 changes: 2 additions & 5 deletions source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,8 @@ void ActiveUdpListener::onWriteReady(const Network::Socket&) {
// data
}

void ActiveUdpListener::onReceiveError(const Network::UdpListenerCallbacks::ErrorCode&,
Api::IoError::IoErrorCode) {
// TODO(sumukhs): Determine what to do on receive error.
// Would the filters need to know on error? Can't foresee a scenario where they
// would take an action
void ActiveUdpListener::onReceiveError(Api::IoError::IoErrorCode error_code) {
read_filter_->onReceiveError(error_code);
}

void ActiveUdpListener::addReadFilter(Network::UdpListenerReadFilterPtr&& filter) {
Expand Down
3 changes: 1 addition & 2 deletions source/server/connection_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ class ActiveUdpListener : public Network::UdpListenerCallbacks,
// Network::UdpListenerCallbacks
void onData(Network::UdpRecvData& data) override;
void onWriteReady(const Network::Socket& socket) override;
void onReceiveError(const Network::UdpListenerCallbacks::ErrorCode& error_code,
Api::IoError::IoErrorCode err) override;
void onReceiveError(Api::IoError::IoErrorCode error_code) override;

// ActiveListenerImplBase
Network::Listener* listener() override { return udp_listener_.get(); }
Expand Down
Loading