Skip to content

Commit

Permalink
config/stats: add udp statds address as config option (#1019)
Browse files Browse the repository at this point in the history
  • Loading branch information
hennna authored and RomanDzhabarov committed May 26, 2017
1 parent 8d72607 commit a8c446a
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 20 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ maximize the chances of your PR being merged.
deprecation window. We make no guarantees about code or deployments that rely on undocumented
behavior.
* All deprecations/breaking changes will be clearly listed in the release notes.
* See [DEPRECATED.md](DEPRECATED.md)

# Release cadence

Expand Down
10 changes: 10 additions & 0 deletions DEPRECATED.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# DEPRECATED

As of release 1.3.0, Envoy will follow a
[Breaking Change Policy](https://github.com/lyft/envoy/blob/master//CONTRIBUTING.md#breaking-change-policy).

The following features have been DEPRECATED and will be removed in the specified release cycle.

* Version 1.4.0
* Config option `statsd_local_udp_port` has been deprecated and has been replaced with
`statsd_udp_ip_address`.
9 changes: 8 additions & 1 deletion docs/configuration/overview/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ specify miscellaneous configuration.
"cluster_manager": "{...}",
"flags_path": "...",
"statsd_local_udp_port": "...",
"statsd_udp_ip_address": "...",
"statsd_tcp_cluster_name": "...",
"stats_flush_interval_ms": "...",
"watchdog_miss_timeout_ms": "...",
Expand Down Expand Up @@ -45,10 +46,16 @@ flags_path
*(optional, string)* The file system path to search for :ref:`startup flag files
<operations_file_system_flags>`.

statsd_local_udp_port
statsd_local_udp_port (Warning: DEPRECATED and will be removed in 1.4.0)
*(optional, integer)* The UDP port of a locally running statsd compliant listener. If specified,
:ref:`statistics <arch_overview_statistics>` will be flushed to this port.

statsd_udp_ip_address
*(optional, string)* The UDP address of a running statsd compliant listener. If specified,
:ref:`statistics <arch_overview_statistics>` will be flushed to this address. IPv4 addresses should
have format host:port (ex: 127.0.0.1:855). IPv6 addresses should have URL format [host]:port
(ex: [::1]:855).

statsd_tcp_cluster_name
*(optional, string)* The name of a cluster manager cluster that is running a TCP statsd compliant
listener. If specified, Envoy will connect to this cluster to flush :ref:`statistics
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/server/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,17 @@ class Main {
*/
virtual Optional<std::string> statsdTcpClusterName() PURE;

// TODO(hennna): DEPRECATED - will be removed in 1.4.0.
/**
* @return Optional<uint32_t> the optional local UDP statsd port to write to.
*/
virtual Optional<uint32_t> statsdUdpPort() PURE;

/**
* @return Optional<std::string> the optional UDP statsd address to write to.
*/
virtual Optional<std::string> statsdUdpIpAddress() PURE;

/**
* @return std::chrono::milliseconds the time interval between flushing to configured stat sinks.
* The server latches counters.
Expand Down
1 change: 1 addition & 0 deletions source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF(
"cluster_manager" : {"type" : "object"},
"flags_path" : {"type" : "string"},
"statsd_local_udp_port" : {"type" : "integer"},
"statsd_udp_ip_address" : {"type" : "string"},
"statsd_tcp_cluster_name" : {"type" : "string"},
"stats_flush_interval_ms" : {"type" : "integer"},
"tracing" : {
Expand Down
29 changes: 23 additions & 6 deletions source/common/stats/statsd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ namespace Envoy {
namespace Stats {
namespace Statsd {

Writer::Writer(uint32_t port) {
Network::Address::InstanceConstSharedPtr address(new Network::Address::Ipv4Instance(port));
Writer::Writer(Network::Address::InstanceConstSharedPtr address) {
fd_ = address->socket(Network::Address::SocketType::Datagram);
ASSERT(fd_ != -1);

Expand All @@ -29,7 +28,14 @@ Writer::Writer(uint32_t port) {
UNREFERENCED_PARAMETER(rc);
}

Writer::~Writer() { close(fd_); }
Writer::~Writer() { ASSERT(shutdown_); }

void Writer::shutdown() {
shutdown_ = true;
if (fd_ != -1) {
RELEASE_ASSERT(close(fd_) == 0);
}
}

void Writer::writeCounter(const std::string& name, uint64_t increment) {
std::string message(fmt::format("envoy.{}:{}|c", name, increment));
Expand All @@ -47,19 +53,30 @@ void Writer::writeTimer(const std::string& name, const std::chrono::milliseconds
}

void Writer::send(const std::string& message) {
if (shutdown_) {
return;
}
::send(fd_, message.c_str(), message.size(), MSG_DONTWAIT);
}

UdpStatsdSink::UdpStatsdSink(ThreadLocal::Instance& tls,
Network::Address::InstanceConstSharedPtr address)
: tls_(tls), tls_slot_(tls.allocateSlot()), server_address_(address) {
tls.set(tls_slot_, [this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<Writer>(this->server_address_);
});
}

void UdpStatsdSink::flushCounter(const std::string& name, uint64_t delta) {
writer().writeCounter(name, delta);
tls_.getTyped<Writer>(tls_slot_).writeCounter(name, delta);
}

void UdpStatsdSink::flushGauge(const std::string& name, uint64_t value) {
writer().writeGauge(name, value);
tls_.getTyped<Writer>(tls_slot_).writeGauge(name, value);
}

void UdpStatsdSink::onTimespanComplete(const std::string& name, std::chrono::milliseconds ms) {
writer().writeTimer(name, ms);
tls_.getTyped<Writer>(tls_slot_).writeTimer(name, ms);
}

TcpStatsdSink::TcpStatsdSink(const LocalInfo::LocalInfo& local_info,
Expand Down
23 changes: 13 additions & 10 deletions source/common/stats/statsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,31 @@ namespace Statsd {
/**
* This is a simple UDP localhost writer for statsd messages.
*/
class Writer {
class Writer : public ThreadLocal::ThreadLocalObject {
public:
Writer(uint32_t port);
Writer(Network::Address::InstanceConstSharedPtr address);
~Writer();

void writeCounter(const std::string& name, uint64_t increment);
void writeGauge(const std::string& name, uint64_t value);
void writeTimer(const std::string& name, const std::chrono::milliseconds& ms);
void shutdown() override;
// Called in unit test to validate address.
int getFdForTests() const { return fd_; };

private:
void send(const std::string& message);

int fd_;
bool shutdown_{};
};

/**
* Implementation of Sink that writes to a local UDP statsd port.
* Implementation of Sink that writes to a UDP statsd address.
*/
class UdpStatsdSink : public Sink {
public:
UdpStatsdSink(uint32_t port) : port_(port) {}
UdpStatsdSink(ThreadLocal::Instance& tls, Network::Address::InstanceConstSharedPtr address);

// Stats::Sink
void flushCounter(const std::string& name, uint64_t delta) override;
Expand All @@ -47,14 +51,13 @@ class UdpStatsdSink : public Sink {
onTimespanComplete(name, std::chrono::milliseconds(value));
}
void onTimespanComplete(const std::string& name, std::chrono::milliseconds ms) override;
// Called in unit test to validate writer construction and address.
int getFdForTests() { return tls_.getTyped<Writer>(tls_slot_).getFdForTests(); }

private:
Writer& writer() {
static thread_local Statsd::Writer writer_(port_);
return writer_;
}

uint32_t port_;
ThreadLocal::Instance& tls_;
const uint32_t tls_slot_;
Network::Address::InstanceConstSharedPtr server_address_;
};

/**
Expand Down
1 change: 1 addition & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ envoy_cc_library(
"//source/common/common:utility_lib",
"//source/common/common:version_lib",
"//source/common/memory:stats_lib",
"//source/common/network:address_lib",
"//source/common/network:utility_lib",
"//source/common/runtime:runtime_lib",
"//source/common/ssl:context_lib",
Expand Down
10 changes: 10 additions & 0 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,20 @@ void MainImpl::initialize(const Json::Object& json) {
Server::Configuration::ListenerPtr{new ListenerConfig(*this, *listeners[i])});
}

if (json.hasObject("statsd_local_udp_port") && json.hasObject("statsd_udp_ip_address")) {
throw EnvoyException("statsd_local_udp_port and statsd_udp_ip_address "
"are mutually exclusive.");
}

// TODO(hennna): DEPRECATED - statsd_local_udp_port will be removed in 1.4.0.
if (json.hasObject("statsd_local_udp_port")) {
statsd_udp_port_.value(json.getInteger("statsd_local_udp_port"));
}

if (json.hasObject("statsd_udp_ip_address")) {
statsd_udp_ip_address_.value(json.getString("statsd_udp_ip_address"));
}

if (json.hasObject("statsd_tcp_cluster_name")) {
statsd_tcp_cluster_name_.value(json.getString("statsd_tcp_cluster_name"));
}
Expand Down
3 changes: 3 additions & 0 deletions source/server/configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
const std::list<ListenerPtr>& listeners() override;
RateLimit::ClientFactory& rateLimitClientFactory() override { return *ratelimit_client_factory_; }
Optional<std::string> statsdTcpClusterName() override { return statsd_tcp_cluster_name_; }
// TODO(hennna): DEPRECATED - statsdUdpPort() will be removed in 1.4.0
Optional<uint32_t> statsdUdpPort() override { return statsd_udp_port_; }
Optional<std::string> statsdUdpIpAddress() override { return statsd_udp_ip_address_; }
std::chrono::milliseconds statsFlushInterval() override { return stats_flush_interval_; }
std::chrono::milliseconds wdMissTimeout() const override { return watchdog_miss_timeout_; }
std::chrono::milliseconds wdMegaMissTimeout() const override {
Expand Down Expand Up @@ -263,6 +265,7 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
std::list<Server::Configuration::ListenerPtr> listeners_;
Optional<std::string> statsd_tcp_cluster_name_;
Optional<uint32_t> statsd_udp_port_;
Optional<std::string> statsd_udp_ip_address_;
RateLimit::ClientFactoryPtr ratelimit_client_factory_;
std::chrono::milliseconds stats_flush_interval_;
std::chrono::milliseconds watchdog_miss_timeout_;
Expand Down
16 changes: 14 additions & 2 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "common/common/utility.h"
#include "common/common/version.h"
#include "common/memory/stats.h"
#include "common/network/address_impl.h"
#include "common/network/utility.h"
#include "common/runtime/runtime_impl.h"
#include "common/stats/statsd.h"
Expand Down Expand Up @@ -320,9 +321,20 @@ Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
}

void InstanceImpl::initializeStatSinks() {
if (config_->statsdUdpPort().valid()) {
if (config_->statsdUdpIpAddress().valid()) {
log().info("statsd UDP ip address: {}", config_->statsdUdpIpAddress().value());
stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink(
thread_local_,
Network::Utility::parseInternetAddressAndPort(config_->statsdUdpIpAddress().value())));
stats_store_.addSink(*stat_sinks_.back());
} else if (config_->statsdUdpPort().valid()) {
// TODO(hennna): DEPRECATED - statsdUdpPort will be removed in 1.4.0.
log().warn("statsd_local_udp_port has been DEPRECATED and will be removed in 1.4.0. "
"Consider setting statsd_udp_ip_address instead.");
log().info("statsd UDP port: {}", config_->statsdUdpPort().value());
stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink(config_->statsdUdpPort().value()));
Network::Address::InstanceConstSharedPtr address(
new Network::Address::Ipv4Instance(config_->statsdUdpPort().value()));
stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink(thread_local_, address));
stats_store_.addSink(*stat_sinks_.back());
}

Expand Down
13 changes: 13 additions & 0 deletions test/common/stats/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "udp_statsd_test",
srcs = ["udp_statsd_test.cc"],
deps = [
"//source/common/network:address_lib",
"//source/common/network:utility_lib",
"//source/common/stats:statsd_lib",
"//test/mocks/thread_local:thread_local_mocks",
"//test/test_common:environment_lib",
"//test/test_common:network_utility_lib",
],
)

envoy_cc_test(
name = "thread_local_store_test",
srcs = ["thread_local_store_test.cc"],
Expand Down
51 changes: 51 additions & 0 deletions test/common/stats/udp_statsd_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include <chrono>

#include "common/network/address_impl.h"
#include "common/network/utility.h"
#include "common/stats/statsd.h"

#include "test/mocks/thread_local/mocks.h"
#include "test/test_common/environment.h"
#include "test/test_common/network_utility.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "spdlog/spdlog.h"

namespace Envoy {
using testing::NiceMock;

namespace Stats {
namespace Statsd {

class UdpStatsdSinkTest : public testing::TestWithParam<Network::Address::IpVersion> {};
INSTANTIATE_TEST_CASE_P(IpVersions, UdpStatsdSinkTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()));

TEST_P(UdpStatsdSinkTest, InitWithIpAddress) {
NiceMock<ThreadLocal::MockInstance> tls_;
// UDP statsd server address.
Network::Address::InstanceConstSharedPtr server_address =
Network::Utility::parseInternetAddressAndPort(
fmt::format("{}:8125", Network::Test::getLoopbackAddressUrlString(GetParam())));
UdpStatsdSink sink(tls_, server_address);
int fd = sink.getFdForTests();
EXPECT_NE(fd, -1);

// Check that fd has not changed.
sink.flushCounter("test_counter", 1);
sink.flushGauge("test_gauge", 1);
sink.onTimespanComplete("test_counter", std::chrono::milliseconds(5));
EXPECT_EQ(fd, sink.getFdForTests());

if (GetParam() == Network::Address::IpVersion::v4) {
EXPECT_EQ("127.0.0.1:8125", Network::Address::peerAddressFromFd(fd)->asString());
} else {
EXPECT_EQ("[::1]:8125", Network::Address::peerAddressFromFd(fd)->asString());
}
tls_.shutdownThread();
}

} // Statsd
} // Stats
} // Envoy
2 changes: 1 addition & 1 deletion test/config/integration/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@

"admin": { "access_log_path": "/dev/null", "profile_path": "{{ test_tmpdir }}/envoy.prof", "address": "tcp://127.0.0.1:0" },
"flags_path": "/invalid_flags",
"statsd_local_udp_port": 8125,
"statsd_udp_ip_address": "127.0.0.1:8125",
"statsd_tcp_cluster_name": "statsd",
"tracing": {
"http": {
Expand Down
1 change: 1 addition & 0 deletions test/mocks/server/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class MockMain : public Main {
MOCK_METHOD0(rateLimitClientFactory, RateLimit::ClientFactory&());
MOCK_METHOD0(statsdTcpClusterName, Optional<std::string>());
MOCK_METHOD0(statsdUdpPort, Optional<uint32_t>());
MOCK_METHOD0(statsdUdpIpAddress, Optional<std::string>());
MOCK_METHOD0(statsFlushInterval, std::chrono::milliseconds());
MOCK_CONST_METHOD0(wdMissTimeout, std::chrono::milliseconds());
MOCK_CONST_METHOD0(wdMegaMissTimeout, std::chrono::milliseconds());
Expand Down

0 comments on commit a8c446a

Please sign in to comment.