diff --git a/admin/api/routes.go b/admin/api/routes.go index 10c2ac373..78bc40a63 100644 --- a/admin/api/routes.go +++ b/admin/api/routes.go @@ -59,8 +59,8 @@ func (h *RoutesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Weight: tg.Weight, Tags: tg.Tags, Cmd: "route add", - Rate1: tg.Timer.Rate1(), - Pct99: tg.Timer.Percentile(0.99), + // Rate1: tg.Timer.Rate1(), + // Pct99: tg.Timer.Percentile(0.99), } routes = append(routes, ar) } diff --git a/admin/server.go b/admin/server.go index 03a4bca01..ff542b37a 100644 --- a/admin/server.go +++ b/admin/server.go @@ -9,7 +9,9 @@ import ( "github.com/fabiolb/fabio/admin/api" "github.com/fabiolb/fabio/admin/ui" "github.com/fabiolb/fabio/config" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/proxy" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // Server provides the HTTP server for the admin UI and API. @@ -20,6 +22,7 @@ type Server struct { Version string Commands string Cfg *config.Config + Metrics *metrics4.Provider } // ListenAndServe starts the admin server. @@ -64,12 +67,21 @@ func (s *Server) handler() http.Handler { mux.Handle("/api/routes", &api.RoutesHandler{}) mux.Handle("/api/version", &api.VersionHandler{Version: s.Version}) mux.Handle("/routes", &ui.RoutesHandler{Color: s.Color, Title: s.Title, Version: s.Version}) + + initMetricsHandlers(mux, s) + mux.HandleFunc("/logo.svg", ui.HandleLogo) mux.HandleFunc("/health", handleHealth) mux.Handle("/", http.RedirectHandler("/routes", http.StatusSeeOther)) return mux } +func initMetricsHandlers(mux *http.ServeMux, s *Server) { + if strings.Contains(s.Cfg.Metrics.Target, "prometheus") && s.Cfg.Metrics.Prometheus.MetricsEndpoint != "" { + mux.HandleFunc(s.Cfg.Metrics.Prometheus.MetricsEndpoint, promhttp.Handler().ServeHTTP) + } +} + func handleHealth(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") } diff --git a/config/config.go b/config/config.go index f8ad7e4c2..7ef0c555d 100644 --- a/config/config.go +++ b/config/config.go @@ -95,15 +95,26 @@ type Log struct { } type Metrics struct { - Target string - Prefix string - Names string - Interval time.Duration - Timeout time.Duration - Retry time.Duration - GraphiteAddr string - StatsDAddr string - Circonus Circonus + Target string + Prefix string + Interval time.Duration + Prometheus Prometheus + StatsD StatsD + Graphite Graphite + Circonus Circonus +} + +type Graphite struct { + Addr string +} + +type StatsD struct { + Addr string + SampleRate float64 +} + +type Prometheus struct { + MetricsEndpoint string } type Registry struct { diff --git a/config/default.go b/config/default.go index 45f68424d..805ece8ed 100644 --- a/config/default.go +++ b/config/default.go @@ -26,11 +26,18 @@ var defaultConfig = &Config{ Level: "INFO", }, Metrics: Metrics{ - Prefix: "{{clean .Hostname}}.{{clean .Exec}}", - Names: "{{clean .Service}}.{{clean .Host}}.{{clean .Path}}.{{clean .TargetURL.Host}}", + //Prefix: "{{clean .Hostname}}.{{clean .Exec}}", + //Names: "{{clean .Service}}.{{clean .Host}}.{{clean .Path}}.{{clean .TargetURL.Host}}", Interval: 30 * time.Second, - Timeout: 10 * time.Second, - Retry: 500 * time.Millisecond, + //Timeout: 10 * time.Second, + //Retry: 500 * time.Millisecond, + Prometheus: Prometheus{ + MetricsEndpoint: "/metrics/prometheus", + }, + StatsD: StatsD{ + SampleRate: 1, + }, + Graphite: Graphite{}, Circonus: Circonus{ APIApp: "fabio", }, diff --git a/config/load.go b/config/load.go index d75188471..22c62d5e7 100644 --- a/config/load.go +++ b/config/load.go @@ -143,19 +143,25 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c f.StringVar(&cfg.Log.AccessTarget, "log.access.target", defaultConfig.Log.AccessTarget, "access log target") f.StringVar(&cfg.Log.RoutesFormat, "log.routes.format", defaultConfig.Log.RoutesFormat, "log format of routing table updates") f.StringVar(&cfg.Log.Level, "log.level", defaultConfig.Log.Level, "log level: TRACE, DEBUG, INFO, WARN, ERROR, FATAL") + f.StringVar(&cfg.Metrics.Target, "metrics.target", defaultConfig.Metrics.Target, "metrics backend") + f.StringVar(&cfg.Metrics.Prefix, "metrics.prefix", defaultConfig.Metrics.Prefix, "prefix for reported metrics") - f.StringVar(&cfg.Metrics.Names, "metrics.names", defaultConfig.Metrics.Names, "route metric name template") f.DurationVar(&cfg.Metrics.Interval, "metrics.interval", defaultConfig.Metrics.Interval, "metrics reporting interval") - f.DurationVar(&cfg.Metrics.Timeout, "metrics.timeout", defaultConfig.Metrics.Timeout, "timeout for metrics to become available") - f.DurationVar(&cfg.Metrics.Retry, "metrics.retry", defaultConfig.Metrics.Retry, "retry interval during startup") - f.StringVar(&cfg.Metrics.GraphiteAddr, "metrics.graphite.addr", defaultConfig.Metrics.GraphiteAddr, "graphite server address") - f.StringVar(&cfg.Metrics.StatsDAddr, "metrics.statsd.addr", defaultConfig.Metrics.StatsDAddr, "statsd server address") + + f.StringVar(&cfg.Metrics.Prometheus.MetricsEndpoint, "metrics.prometheus.endpoint", defaultConfig.Metrics.Prometheus.MetricsEndpoint, "metrics endpoint for Prometheus") + + f.StringVar(&cfg.Metrics.Graphite.Addr, "metrics.graphite.addr", defaultConfig.Metrics.Graphite.Addr, "graphite carbon receiver or aggregator (plaintext) address") + + f.StringVar(&cfg.Metrics.StatsD.Addr, "metrics.statsd.addr", defaultConfig.Metrics.StatsD.Addr, "statsd server address") + f.Float64Var(&cfg.Metrics.StatsD.SampleRate, "metrics.statsd.sampleRate", defaultConfig.Metrics.StatsD.SampleRate, "statsd sample rate") + f.StringVar(&cfg.Metrics.Circonus.APIKey, "metrics.circonus.apikey", defaultConfig.Metrics.Circonus.APIKey, "Circonus API token key") f.StringVar(&cfg.Metrics.Circonus.APIApp, "metrics.circonus.apiapp", defaultConfig.Metrics.Circonus.APIApp, "Circonus API token app") f.StringVar(&cfg.Metrics.Circonus.APIURL, "metrics.circonus.apiurl", defaultConfig.Metrics.Circonus.APIURL, "Circonus API URL") f.StringVar(&cfg.Metrics.Circonus.BrokerID, "metrics.circonus.brokerid", defaultConfig.Metrics.Circonus.BrokerID, "Circonus Broker ID") f.StringVar(&cfg.Metrics.Circonus.CheckID, "metrics.circonus.checkid", defaultConfig.Metrics.Circonus.CheckID, "Circonus Check ID") + f.StringVar(&cfg.Registry.Backend, "registry.backend", defaultConfig.Registry.Backend, "registry backend") f.DurationVar(&cfg.Registry.Timeout, "registry.timeout", defaultConfig.Registry.Timeout, "timeout for registry to become available") f.DurationVar(&cfg.Registry.Retry, "registry.retry", defaultConfig.Registry.Retry, "retry interval during startup") diff --git a/config/load_test.go b/config/load_test.go index bf7c9f288..adfcb5d3a 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -646,13 +646,6 @@ func TestLoad(t *testing.T) { return cfg }, }, - { - args: []string{"-metrics.names", "some names"}, - cfg: func(cfg *Config) *Config { - cfg.Metrics.Names = "some names" - return cfg - }, - }, { args: []string{"-metrics.interval", "5ms"}, cfg: func(cfg *Config) *Config { diff --git a/main.go b/main.go index 601503c6a..2f40b15d6 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,10 @@ import ( "crypto/tls" "encoding/json" "fmt" + "github.com/fabiolb/fabio/metrics4/circonus" + "github.com/fabiolb/fabio/metrics4/graphite" + "github.com/fabiolb/fabio/metrics4/prefix" + "github.com/fabiolb/fabio/metrics4/stdout" "io" "log" "net" @@ -22,7 +26,9 @@ import ( "github.com/fabiolb/fabio/config" "github.com/fabiolb/fabio/exit" "github.com/fabiolb/fabio/logger" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/prometheus" + "github.com/fabiolb/fabio/metrics4/statsd" "github.com/fabiolb/fabio/noroute" "github.com/fabiolb/fabio/proxy" "github.com/fabiolb/fabio/proxy/tcp" @@ -42,7 +48,7 @@ import ( // // It is also set by the linker when fabio // is built via the Makefile or the build/docker.sh -// script to ensure the correct version nubmer +// script to ensure the correct version number var version = "1.5.8" var shuttingDown int32 @@ -73,7 +79,7 @@ func main() { // warn once so that it is at the beginning of the log // this will also start the reminder go routine if necessary. - WarnIfRunAsRoot(cfg.Insecure) + //WarnIfRunAsRoot(cfg.Insecure) // setup profiling if enabled var prof interface { @@ -113,9 +119,8 @@ func main() { registry.Default.DeregisterAll() }) - // init metrics early since that create the global metric registries - // that are used by other parts of the code. - initMetrics(cfg) + metrics := initMetrics(cfg) + initRuntime(cfg) initBackend(cfg) startAdmin(cfg) @@ -123,21 +128,21 @@ func main() { go watchNoRouteHTML(cfg) first := make(chan bool) - go watchBackend(cfg, first) + go watchBackend(cfg, metrics, first) log.Print("[INFO] Waiting for first routing table") <-first // create proxies after metrics since they use the metrics registry. - startServers(cfg) + startServers(cfg, metrics) // warn again so that it is visible in the terminal - WarnIfRunAsRoot(cfg.Insecure) + //WarnIfRunAsRoot(cfg.Insecure) exit.Wait() log.Print("[INFO] Down") } -func newHTTPProxy(cfg *config.Config) http.Handler { +func newHTTPProxy(cfg *config.Config, stats metrics4.Provider) *proxy.HTTPProxy { var w io.Writer switch cfg.Log.AccessTarget { case "": @@ -164,7 +169,7 @@ func newHTTPProxy(cfg *config.Config) http.Handler { pick := route.Picker[cfg.Proxy.Strategy] match := route.Matcher[cfg.Proxy.Matcher] - notFound := metrics.DefaultRegistry.GetCounter("notfound") + notFound := stats.NewCounter("notfound") log.Printf("[INFO] Using routing strategy %q", cfg.Proxy.Strategy) log.Printf("[INFO] Using route matching %q", cfg.Proxy.Matcher) @@ -187,24 +192,25 @@ func newHTTPProxy(cfg *config.Config) http.Handler { Lookup: func(r *http.Request) *route.Target { t := route.GetTable().Lookup(r, r.Header.Get("trace"), pick, match) if t == nil { - notFound.Inc(1) + notFound.Add(1) log.Print("[WARN] No route for ", r.Host, r.URL) } return t }, - Requests: metrics.DefaultRegistry.GetTimer("requests"), - Noroute: metrics.DefaultRegistry.GetCounter("notfound"), + Requests: stats.NewTimer("requests"), + Noroute: stats.NewCounter("notfound"), + WSConn: stats.NewGauge("ws.conn"), + Metrics: stats, Logger: l, } } -func lookupHostFn(cfg *config.Config) func(string) *route.Target { +func lookupHostFn(cfg *config.Config, notFound metrics4.Counter) func(string) *route.Target { pick := route.Picker[cfg.Proxy.Strategy] - notFound := metrics.DefaultRegistry.GetCounter("notfound") return func(host string) *route.Target { t := route.GetTable().LookupHost(host, pick) if t == nil { - notFound.Inc(1) + notFound.Add(1) log.Print("[WARN] No route for ", host) } return t @@ -249,7 +255,7 @@ func startAdmin(cfg *config.Config) { }() } -func startServers(cfg *config.Config) { +func startServers(cfg *config.Config, stats metrics4.Provider) { for _, l := range cfg.Listen { l := l // capture loop var for go routines below tlscfg, err := makeTLSConfig(l) @@ -262,10 +268,13 @@ func startServers(cfg *config.Config) { log.Printf("[INFO] Client certificate authentication enabled on %s", l.Addr) } + notFound := stats.NewCounter("notfound") switch l.Proto { case "http", "https": go func() { - h := newHTTPProxy(cfg) + h := newHTTPProxy(cfg, stats) + // reset the ws.conn gauge + h.WSConn.Set(0) if err := proxy.ListenAndServeHTTP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) } @@ -274,10 +283,11 @@ func startServers(cfg *config.Config) { go func() { h := &tcp.Proxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp.conn"), + ConnFail: stats.NewCounter("tcp.connfail"), + Noroute: stats.NewCounter("tcp.noroute"), + Metrics: stats, } if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) @@ -287,10 +297,11 @@ func startServers(cfg *config.Config) { go func() { h := &tcp.SNIProxy{ DialTimeout: cfg.Proxy.DialTimeout, - Lookup: lookupHostFn(cfg), - Conn: metrics.DefaultRegistry.GetCounter("tcp_sni.conn"), - ConnFail: metrics.DefaultRegistry.GetCounter("tcp_sni.connfail"), - Noroute: metrics.DefaultRegistry.GetCounter("tcp_sni.noroute"), + Lookup: lookupHostFn(cfg, notFound), + Conn: stats.NewCounter("tcp_sni.conn"), + ConnFail: stats.NewCounter("tcp_sni.connfail"), + Noroute: stats.NewCounter("tcp_sni.noroute"), + Metrics: stats, } if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { exit.Fatal("[FATAL] ", err) @@ -302,31 +313,44 @@ func startServers(cfg *config.Config) { } } -func initMetrics(cfg *config.Config) { - if cfg.Metrics.Target == "" { - log.Printf("[INFO] Metrics disabled") - return - } +func initMetrics(cfg *config.Config) metrics4.Provider { + prefix.InitPrefix(cfg.Metrics.Prefix) + var p []metrics4.Provider + for _, x := range strings.Split(cfg.Metrics.Target, ",") { + x = strings.TrimSpace(x) + switch x { + case "prometheus": + p = append(p, prometheus.NewProvider(prefix.GetPrefix())) + case "graphite": + provider, err := graphite.NewProvider(cfg.Metrics.Graphite, cfg.Metrics.Interval, prefix.GetPrefix()) + + if err != nil { + exit.Fatalf("[FATAL] Cannot initialize graphite metrics: %s", err) + } - var deadline = time.Now().Add(cfg.Metrics.Timeout) - var err error - for { - metrics.DefaultRegistry, err = metrics.NewRegistry(cfg.Metrics) - if err == nil { - route.ServiceRegistry, err = metrics.NewRegistry(cfg.Metrics) - } - if err == nil { - return - } - if time.Now().After(deadline) { - exit.Fatal("[FATAL] ", err) - } - log.Print("[WARN] Error initializing metrics. ", err) - time.Sleep(cfg.Metrics.Retry) - if atomic.LoadInt32(&shuttingDown) > 0 { - exit.Exit(1) + p = append(p, provider) + case "statsd": + p = append(p, statsd.NewProvider(cfg.Metrics.StatsD, cfg.Metrics.Interval, prefix.GetPrefix())) + case "stdout": + p = append(p, stdout.NewProvider(cfg.Metrics.Interval)) + case "circonus": + provider, err := circonus.NewProvider(cfg.Metrics.Circonus, cfg.Metrics.Interval, prefix.GetPrefix()) + + if err != nil { + exit.Fatalf("[FATAL] Cannot initialize circonus metrics: %s", err) + } + + p = append(p, provider) + default: + log.Printf("[WARN] Skipping unknown metrics provider %q", x) + continue } + log.Printf("[INFO] Registering metrics provider %q", x) } + if len(p) == 0 { + log.Printf("[INFO] Metrics disabled") + } + return metrics4.NewMultiProvider(p) } func initRuntime(cfg *config.Config) { @@ -379,7 +403,7 @@ func initBackend(cfg *config.Config) { } } -func watchBackend(cfg *config.Config, first chan bool) { +func watchBackend(cfg *config.Config, p metrics4.Provider, first chan bool) { var ( last string svccfg string @@ -410,19 +434,46 @@ func watchBackend(cfg *config.Config, first chan bool) { } registry.Default.Register(aliases) - t, err := route.NewTable(next) + newTable, err := route.NewTable(next) if err != nil { log.Printf("[WARN] %s", err) continue } - route.SetTable(t) - logRoutes(t, last, next, cfg.Log.RoutesFormat) + + oldTable := route.GetTable() + route.SetTable(newTable) + unregisterMetrics(p, oldTable, newTable) + logRoutes(newTable, last, next, cfg.Log.RoutesFormat) + last = next once.Do(func() { close(first) }) } } +func unregisterMetrics(p metrics4.Provider, oldTable, newTable route.Table) { + names := func(t route.Table) map[string]bool { + m := map[string]bool{} + for _, routes := range t { + for _, r := range routes { + for _, t := range r.Targets { + m[t.TimerName.String()] = true + } + } + } + return m + } + + oldNames := names(oldTable) + newNames := names(newTable) + for n := range oldNames { + if !newNames[n] { + log.Printf("[INFO] Unregistering metric %s", n) + //p.Unregister(n) + } + } +} + func watchNoRouteHTML(cfg *config.Config) { html := registry.Default.WatchNoRouteHTML() for { diff --git a/metrics/circonus.go b/metrics/circonus.go deleted file mode 100644 index c4bfe8f69..000000000 --- a/metrics/circonus.go +++ /dev/null @@ -1,128 +0,0 @@ -package metrics - -import ( - "errors" - "fmt" - "log" - "os" - "sync" - "time" - - cgm "github.com/circonus-labs/circonus-gometrics" - "github.com/fabiolb/fabio/config" -) - -var ( - circonus *cgmRegistry - once sync.Once -) - -const serviceName = "fabio" - -// circonusRegistry returns a provider that reports to Circonus. -func circonusRegistry(prefix string, circ config.Circonus, interval time.Duration) (Registry, error) { - var initError error - - once.Do(func() { - if circ.APIKey == "" { - initError = errors.New("metrics: Circonus API token key") - return - } - - if circ.APIApp == "" { - circ.APIApp = serviceName - } - - host, err := os.Hostname() - if err != nil { - initError = fmt.Errorf("metrics: unable to initialize Circonus %s", err) - return - } - - cfg := &cgm.Config{} - - cfg.CheckManager.API.TokenKey = circ.APIKey - cfg.CheckManager.API.TokenApp = circ.APIApp - cfg.CheckManager.API.URL = circ.APIURL - cfg.CheckManager.Check.ID = circ.CheckID - cfg.CheckManager.Broker.ID = circ.BrokerID - cfg.Interval = fmt.Sprintf("%.0fs", interval.Seconds()) - cfg.CheckManager.Check.InstanceID = host - cfg.CheckManager.Check.DisplayName = fmt.Sprintf("%s /%s", host, serviceName) - cfg.CheckManager.Check.SearchTag = fmt.Sprintf("service:%s", serviceName) - - metrics, err := cgm.NewCirconusMetrics(cfg) - if err != nil { - initError = fmt.Errorf("metrics: unable to initialize Circonus %s", err) - return - } - - circonus = &cgmRegistry{metrics, prefix} - - metrics.Start() - - log.Print("[INFO] Sending metrics to Circonus") - }) - - return circonus, initError -} - -type cgmRegistry struct { - metrics *cgm.CirconusMetrics - prefix string -} - -// Names is not supported by Circonus. -func (m *cgmRegistry) Names() []string { return nil } - -// Unregister is implicitly supported by Circonus, -// stop submitting the metric and it stops being sent to Circonus. -func (m *cgmRegistry) Unregister(name string) {} - -// UnregisterAll is implicitly supported by Circonus, -// stop submitting metrics and they will no longer be sent to Circonus. -func (m *cgmRegistry) UnregisterAll() {} - -// GetCounter returns a counter for the given metric name. -func (m *cgmRegistry) GetCounter(name string) Counter { - metricName := fmt.Sprintf("%s`%s", m.prefix, name) - return &cgmCounter{m.metrics, metricName} -} - -// GetTimer returns a timer for the given metric name. -func (m *cgmRegistry) GetTimer(name string) Timer { - metricName := fmt.Sprintf("%s`%s", m.prefix, name) - return &cgmTimer{m.metrics, metricName} -} - -type cgmCounter struct { - metrics *cgm.CirconusMetrics - name string -} - -// Inc increases the counter by n. -func (c *cgmCounter) Inc(n int64) { - c.metrics.IncrementByValue(c.name, uint64(n)) -} - -type cgmTimer struct { - metrics *cgm.CirconusMetrics - name string -} - -// Percentile is not supported by Circonus. -func (t *cgmTimer) Percentile(nth float64) float64 { return 0 } - -// Rate1 is not supported by Circonus. -func (t *cgmTimer) Rate1() float64 { return 0 } - -func (t *cgmTimer) Update(d time.Duration) { - t.metrics.Timing(t.name, float64(d)) -} - -// UpdateSince adds delta between start and current time as -// a sample to a histogram. The histogram is created if it -// does not already exist. -func (t *cgmTimer) UpdateSince(start time.Time) { - t.metrics.Timing(t.name, float64(time.Since(start))) -} diff --git a/metrics/circonus_test.go b/metrics/circonus_test.go deleted file mode 100644 index 808b3e183..000000000 --- a/metrics/circonus_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package metrics - -import ( - "os" - "testing" - "time" - - "github.com/fabiolb/fabio/config" -) - -func TestRegistry(t *testing.T) { - t.Log("Testing registry interface") - - p := &cgmRegistry{} - - t.Log("\tNames()") - names := p.Names() - if names != nil { - t.Errorf("Expected nil got '%+v'", names) - } - - t.Log("\tUnregister()") - p.Unregister("foo") - - t.Log("\tUnregisterAll()") - p.UnregisterAll() - - t.Log("\tGetTimer()") - timer := p.GetTimer("foo") - if timer == nil { - t.Error("Expected a timer, got nil") - } -} - -func TestTimer(t *testing.T) { - t.Log("Testing timer interface") - - timer := &cgmTimer{} - - t.Log("\tPercentile()") - pct := timer.Percentile(99.9) - if pct != 0 { - t.Errorf("Expected 0 got '%+v'", pct) - } - - t.Log("\tRate1()") - rate := timer.Rate1() - if rate != 0 { - t.Errorf("Expected 0 got '%+v'", rate) - } -} - -func TestAll(t *testing.T) { - start := time.Now() - - if os.Getenv("CIRCONUS_API_TOKEN") == "" { - t.Skip("skipping test; $CIRCONUS_API_TOKEN not set") - } - - t.Log("Testing cgm functionality -- this *will* create/use a check") - - cfg := config.Circonus{ - APIKey: os.Getenv("CIRCONUS_API_TOKEN"), - APIApp: os.Getenv("CIRCONUS_API_APP"), - APIURL: os.Getenv("CIRCONUS_API_URL"), - CheckID: os.Getenv("CIRCONUS_CHECK_ID"), - BrokerID: os.Getenv("CIRCONUS_BROKER_ID"), - } - - interval, err := time.ParseDuration("60s") - if err != nil { - t.Fatalf("Unable to parse interval %+v", err) - } - - circ, err := circonusRegistry("test", cfg, interval) - if err != nil { - t.Fatalf("Unable to initialize Circonus +%v", err) - } - - counter := circ.GetCounter("fooCounter") - counter.Inc(3) - - timer := circ.GetTimer("fooTimer") - timer.UpdateSince(start) - - circonus.metrics.Flush() -} diff --git a/metrics/gometrics.go b/metrics/gometrics.go deleted file mode 100644 index e219f430e..000000000 --- a/metrics/gometrics.go +++ /dev/null @@ -1,85 +0,0 @@ -package metrics - -import ( - "errors" - "fmt" - "log" - "net" - "os" - "sort" - "time" - - graphite "github.com/cyberdelia/go-metrics-graphite" - statsd "github.com/magiconair/go-metrics-statsd" - gm "github.com/rcrowley/go-metrics" -) - -// gmStdoutRegistry returns a go-metrics registry that reports to stdout. -func gmStdoutRegistry(interval time.Duration) (Registry, error) { - logger := log.New(os.Stderr, "localhost: ", log.Lmicroseconds) - r := gm.NewRegistry() - go gm.Log(r, interval, logger) - return &gmRegistry{r}, nil -} - -// gmGraphiteRegistry returns a go-metrics registry that reports to a Graphite server. -func gmGraphiteRegistry(prefix, addr string, interval time.Duration) (Registry, error) { - if addr == "" { - return nil, errors.New(" graphite addr missing") - } - - a, err := net.ResolveTCPAddr("tcp", addr) - if err != nil { - return nil, fmt.Errorf(" cannot connect to Graphite: %s", err) - } - - r := gm.NewRegistry() - go graphite.Graphite(r, interval, prefix, a) - return &gmRegistry{r}, nil -} - -// gmStatsDRegistry returns a go-metrics registry that reports to a StatsD server. -func gmStatsDRegistry(prefix, addr string, interval time.Duration) (Registry, error) { - if addr == "" { - return nil, errors.New(" statsd addr missing") - } - - a, err := net.ResolveUDPAddr("udp", addr) - if err != nil { - return nil, fmt.Errorf(" cannot connect to StatsD: %s", err) - } - - r := gm.NewRegistry() - go statsd.StatsD(r, interval, prefix, a) - return &gmRegistry{r}, nil -} - -// gmRegistry implements the Registry interface -// using the github.com/rcrowley/go-metrics library. -type gmRegistry struct { - r gm.Registry -} - -func (p *gmRegistry) Names() (names []string) { - p.r.Each(func(name string, _ interface{}) { - names = append(names, name) - }) - sort.Strings(names) - return names -} - -func (p *gmRegistry) Unregister(name string) { - p.r.Unregister(name) -} - -func (p *gmRegistry) UnregisterAll() { - p.r.UnregisterAll() -} - -func (p *gmRegistry) GetCounter(name string) Counter { - return gm.GetOrRegisterCounter(name, p.r) -} - -func (p *gmRegistry) GetTimer(name string) Timer { - return gm.GetOrRegisterTimer(name, p.r) -} diff --git a/metrics/metrics.go b/metrics/metrics.go deleted file mode 100644 index 2be4c475b..000000000 --- a/metrics/metrics.go +++ /dev/null @@ -1,157 +0,0 @@ -// Package metrics provides functions for collecting -// and managing metrics through different metrics libraries. -// -// Metrics library implementations must implement the -// Registry interface in the package. -package metrics - -import ( - "bytes" - "fmt" - "log" - "net/url" - "os" - "path/filepath" - "strings" - "text/template" - - "github.com/fabiolb/fabio/config" - "github.com/fabiolb/fabio/exit" -) - -// DefaultRegistry stores the metrics library provider. -var DefaultRegistry Registry = NoopRegistry{} - -// DefaultNames contains the default template for route metric names. -const DefaultNames = "{{clean .Service}}.{{clean .Host}}.{{clean .Path}}.{{clean .TargetURL.Host}}" - -// DefaulPrefix contains the default template for metrics prefix. -const DefaultPrefix = "{{clean .Hostname}}.{{clean .Exec}}" - -// names stores the template for the route metric names. -var names *template.Template - -// prefix stores the final prefix string to use it with metric collectors where applicable, i.e. Graphite/StatsD -var prefix string - -func init() { - // make sure names is initialized to something - var err error - if names, err = parseNames(DefaultNames); err != nil { - panic(err) - } -} - -// NewRegistry creates a new metrics registry. -func NewRegistry(cfg config.Metrics) (r Registry, err error) { - - if prefix, err = parsePrefix(cfg.Prefix); err != nil { - return nil, fmt.Errorf("metrics: invalid Prefix template. %s", err) - } - - if names, err = parseNames(cfg.Names); err != nil { - return nil, fmt.Errorf("metrics: invalid names template. %s", err) - } - - switch cfg.Target { - case "stdout": - log.Printf("[INFO] Sending metrics to stdout") - return gmStdoutRegistry(cfg.Interval) - - case "graphite": - log.Printf("[INFO] Sending metrics to Graphite on %s as %q", cfg.GraphiteAddr, prefix) - return gmGraphiteRegistry(prefix, cfg.GraphiteAddr, cfg.Interval) - - case "statsd": - log.Printf("[INFO] Sending metrics to StatsD on %s as %q", cfg.StatsDAddr, prefix) - return gmStatsDRegistry(prefix, cfg.StatsDAddr, cfg.Interval) - - case "circonus": - return circonusRegistry(prefix, cfg.Circonus, cfg.Interval) - - default: - exit.Fatal("[FATAL] Invalid metrics target ", cfg.Target) - } - panic("unreachable") -} - -// parsePrefix parses the prefix metric template -func parsePrefix(tmpl string) (string, error) { - // Backward compatibility condition for old metrics.prefix parameter 'default' - if tmpl == "default" { - tmpl = DefaultPrefix - } - funcMap := template.FuncMap{ - "clean": clean, - } - t, err := template.New("prefix").Funcs(funcMap).Parse(tmpl) - if err != nil { - return "", err - } - host, err := hostname() - if err != nil { - return "", err - } - exe := filepath.Base(os.Args[0]) - - b := new(bytes.Buffer) - data := struct{ Hostname, Exec string }{host, exe} - if err := t.Execute(b, &data); err != nil { - return "", err - } - return b.String(), nil -} - -// parseNames parses the route metric name template. -func parseNames(tmpl string) (*template.Template, error) { - funcMap := template.FuncMap{ - "clean": clean, - } - t, err := template.New("names").Funcs(funcMap).Parse(tmpl) - if err != nil { - return nil, err - } - testURL, err := url.Parse("http://127.0.0.1:12345/") - if err != nil { - return nil, err - } - if _, err := TargetName("testservice", "test.example.com", "/test", testURL); err != nil { - return nil, err - } - return t, nil -} - -// TargetName returns the metrics name from the given parameters. -func TargetName(service, host, path string, targetURL *url.URL) (string, error) { - if names == nil { - return "", nil - } - - var name bytes.Buffer - - data := struct { - Service, Host, Path string - TargetURL *url.URL - }{service, host, path, targetURL} - - if err := names.Execute(&name, data); err != nil { - return "", err - } - - return name.String(), nil -} - -// clean creates safe names for graphite reporting by replacing -// some characters with underscores. -// TODO(fs): This may need updating for other metrics backends. -func clean(s string) string { - if s == "" { - return "_" - } - s = strings.Replace(s, ".", "_", -1) - s = strings.Replace(s, ":", "_", -1) - return strings.ToLower(s) -} - -// stubbed out for testing -var hostname = os.Hostname diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go deleted file mode 100644 index 8f8daf47f..000000000 --- a/metrics/metrics_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package metrics - -import ( - "net/url" - "os" - "testing" -) - -func TestParsePrefix(t *testing.T) { - hostname = func() (string, error) { return "myhost", nil } - os.Args = []string{"./myapp"} - got, err := parsePrefix("{{clean .Hostname}}.{{clean .Exec}}") - if err != nil { - t.Fatalf("%v", err) - } - want := "myhost.myapp" - if got != want { - t.Errorf("ParsePrefix: got %v want %v", got, want) - } - - got, err = parsePrefix("default") - if err != nil { - t.Fatalf("%v", err) - } - want = "myhost.myapp" - if got != want { - t.Errorf("ParsePrefix Old default style: got %v want %v", got, want) - } -} - -func TestTargetName(t *testing.T) { - tests := []struct { - service, host, path, target string - name string - }{ - {"s", "h", "p", "http://foo.com/bar", "s.h.p.foo_com"}, - {"s", "", "p", "http://foo.com/bar", "s._.p.foo_com"}, - {"s", "", "", "http://foo.com/bar", "s._._.foo_com"}, - {"", "", "", "http://foo.com/bar", "_._._.foo_com"}, - {"", "", "", "http://foo.com:1234/bar", "_._._.foo_com_1234"}, - {"", "", "", "http://1.2.3.4:1234/bar", "_._._.1_2_3_4_1234"}, - } - - for i, tt := range tests { - u, err := url.Parse(tt.target) - if err != nil { - t.Fatalf("%d: %v", i, err) - } - - got, err := TargetName(tt.service, tt.host, tt.path, u) - if err != nil { - t.Fatalf("%d: %v", i, err) - } - if want := tt.name; got != want { - t.Errorf("%d: got %q want %q", i, got, want) - } - } -} diff --git a/metrics/noop.go b/metrics/noop.go deleted file mode 100644 index 738517f9f..000000000 --- a/metrics/noop.go +++ /dev/null @@ -1,36 +0,0 @@ -package metrics - -import "time" - -// NoopRegistry is a stub implementation of the Registry interface. -type NoopRegistry struct{} - -func (p NoopRegistry) Names() []string { return nil } - -func (p NoopRegistry) Unregister(name string) {} - -func (p NoopRegistry) UnregisterAll() {} - -func (p NoopRegistry) GetCounter(name string) Counter { return noopCounter } - -func (p NoopRegistry) GetTimer(name string) Timer { return noopTimer } - -var noopCounter = NoopCounter{} - -// NoopCounter is a stub implementation of the Counter interface. -type NoopCounter struct{} - -func (c NoopCounter) Inc(n int64) {} - -var noopTimer = NoopTimer{} - -// NoopTimer is a stub implementation of the Timer interface. -type NoopTimer struct{} - -func (t NoopTimer) Update(time.Duration) {} - -func (t NoopTimer) UpdateSince(time.Time) {} - -func (t NoopTimer) Rate1() float64 { return 0 } - -func (t NoopTimer) Percentile(nth float64) float64 { return 0 } diff --git a/metrics/registry.go b/metrics/registry.go deleted file mode 100644 index e60924a30..000000000 --- a/metrics/registry.go +++ /dev/null @@ -1,54 +0,0 @@ -package metrics - -import "time" - -// Registry defines an interface for metrics values which -// can be implemented by different metrics libraries. -// An implementation must be safe to be used by multiple -// go routines. -type Registry interface { - // Names returns the list of registered metrics acquired - // through the GetXXX() functions. It should return them - // sorted in alphabetical order. - Names() []string - - // Unregister removes the registered metric and stops - // reporting it to an external backend. - Unregister(name string) - - // UnregisterAll removes all registered metrics and stops - // reporting them to an external backend. - UnregisterAll() - - // GetCounter returns a counter metric for the given name. - // If the metric does not exist yet it should be created - // otherwise the existing metric should be returned. - GetCounter(name string) Counter - - // GetTimer returns a timer metric for the given name. - // If the metric does not exist yet it should be created - // otherwise the existing metric should be returned. - GetTimer(name string) Timer -} - -// Counter defines a metric for counting events. -type Counter interface { - // Inc increases the counter value by 'n'. - Inc(n int64) -} - -// Timer defines a metric for counting and timing durations for events. -type Timer interface { - // Percentile returns the nth percentile of the duration. - Percentile(nth float64) float64 - - // Rate1 returns the 1min rate. - Rate1() float64 - - // Update counts an event and records the duration. - Update(time.Duration) - - // UpdateSince counts an event and records the duration - // as the delta between 'start' and the function is called. - UpdateSince(start time.Time) -} diff --git a/metrics4/circonus/metrics.go b/metrics4/circonus/metrics.go new file mode 100644 index 000000000..c5897a585 --- /dev/null +++ b/metrics4/circonus/metrics.go @@ -0,0 +1,147 @@ +package circonus + +import ( + "errors" + "fmt" + cgm "github.com/circonus-labs/circonus-gometrics" + "github.com/fabiolb/fabio/config" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/untagged" + "log" + "os" + "sync" + "time" +) + +var ( + metrics *cgm.CirconusMetrics + once sync.Once +) + +type Provider struct { + c *cgm.CirconusMetrics + prefix string +} + +func NewProvider(circonusCfg config.Circonus, interval time.Duration, prefix string) (metrics4.Provider, error) { + var initError error + var metrics *cgm.CirconusMetrics + + once.Do(func() { + if circonusCfg.APIKey == "" { + initError = errors.New("metrics: Circonus API token key") + return + } + + if circonusCfg.APIApp == "" { + circonusCfg.APIApp = metrics4.FabioNamespace + } + + host, err := os.Hostname() + if err != nil { + initError = fmt.Errorf("metrics: unable to initialize Circonus %s", err) + return + } + + cfg := &cgm.Config{} + + cfg.CheckManager.API.TokenKey = circonusCfg.APIKey + cfg.CheckManager.API.TokenApp = circonusCfg.APIApp + cfg.CheckManager.API.URL = circonusCfg.APIURL + cfg.CheckManager.Check.ID = circonusCfg.CheckID + cfg.CheckManager.Broker.ID = circonusCfg.BrokerID + cfg.Interval = fmt.Sprintf("%.0fs", interval.Seconds()) + cfg.CheckManager.Check.InstanceID = host + cfg.CheckManager.Check.DisplayName = fmt.Sprintf("%s /%s", host, metrics4.FabioNamespace) + cfg.CheckManager.Check.SearchTag = fmt.Sprintf("service:%s", metrics4.FabioNamespace) + + metrics, err := cgm.NewCirconusMetrics(cfg) + if err != nil { + initError = fmt.Errorf("metrics: unable to initialize Circonus %s", err) + return + } + + metrics.Start() + + log.Print("[INFO] Sending metrics to Circonus") + }) + + return &Provider{metrics, prefix}, initError +} + +func (p *Provider) NewCounter(name string, labelsNames ... string) metrics4.Counter { + name = getPrefixName(p.prefix, name) + if len(labelsNames) == 0 { + return &Counter{p.c, name} + } + return untagged.NewCounter(p, name, labelsNames) +} + +func (p *Provider) NewGauge(name string, labelsNames ... string) metrics4.Gauge { + name = getPrefixName(p.prefix, name) + if len(labelsNames) == 0 { + return &Gauge{p.c, name} + } + return untagged.NewGauge(p, name, labelsNames) +} + +func (p *Provider) NewTimer(name string, labelsNames ... string) metrics4.Timer { + name = getPrefixName(p.prefix, name) + if len(labelsNames) == 0 { + return &Timer{p.c.NewHistogram(name)} + } + return untagged.NewTimer(p, name, labelsNames) +} + +func (p *Provider) Close() error { + return nil +} + +func getPrefixName(prefix string, name string) string { + if len(prefix) == 0 { + return name + } + return fmt.Sprintf("%s`%s", prefix, name) +} + +type Counter struct { + metrics *cgm.CirconusMetrics + name string +} + +func (c *Counter) Add(value float64) { + c.metrics.Add(c.name, uint64(value)) +} + +func (c *Counter) With(labels ... string) metrics4.Counter { + return c +} + +type Gauge struct { + metrics *cgm.CirconusMetrics + name string +} + +func (g *Gauge) Add(value float64) { + g.metrics.Add(g.name, uint64(value)) +} + +func (g *Gauge) Set(value float64) { + g.metrics.Set(g.name, uint64(value)) +} + +func (g *Gauge) With(labels ... string) metrics4.Gauge { + return g +} + +type Timer struct { + h *cgm.Histogram +} + +func (t *Timer) Observe(duration float64) { + t.h.RecordValue(duration) +} + +func (g *Timer) With(labels ... string) metrics4.Timer { + return g +} diff --git a/metrics4/gm/gm.go b/metrics4/gm/gm.go new file mode 100644 index 000000000..349f5d16c --- /dev/null +++ b/metrics4/gm/gm.go @@ -0,0 +1,89 @@ +package gm + +import ( + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/untagged" + gm "github.com/rcrowley/go-metrics" + "time" +) + +type provider struct { + r gm.Registry +} + +func (p *provider) NewCounter(name string, labelsNames ...string) metrics4.Counter { + if len(labelsNames) == 0 { + return &counter{gm.GetOrRegisterCounter(name, p.r)} + } + return untagged.NewCounter(p, name, labelsNames) +} + +func (p *provider) NewGauge(name string, labelsNames ...string) metrics4.Gauge { + if len(labelsNames) == 0 { + return &gauge{gm.GetOrRegisterGaugeFloat64(name, p.r)} + } + return untagged.NewGauge(p, name, labelsNames) +} + +func (p *provider) NewTimer(name string, labelsNames ...string) metrics4.Timer { + if len(labelsNames) == 0 { + return &timer{gm.GetOrRegisterTimer(name, p.r)} + } + return untagged.NewTimer(p, name, labelsNames) +} + +func (p *provider) Close() error { + return nil +} + +func NewProvider(r gm.Registry) metrics4.Provider { + return &provider{r} +} + +type counter struct { + c gm.Counter +} + +func (c *counter) Add(value float64) { + c.c.Inc(int64(value)) +} + +func (c *counter) With(labels ... string) metrics4.Counter { + return c +} + +type gauge struct { + g gm.GaugeFloat64 +} + +func (g *gauge) Add(value float64) { + g.g.Update(g.g.Value() + value) +} + +func (g *gauge) Set(value float64) { + g.g.Update(value) +} + +func (g *gauge) With(labels ... string) metrics4.Gauge { + return g +} + +func NewGauge(g gm.GaugeFloat64) metrics4.Gauge { + return &gauge{g} +} + +type timer struct { + t gm.Timer +} + +func (t *timer) Observe(value float64) { + t.t.Update(time.Duration(value)) +} + +func (t *timer) With(labels ... string) metrics4.Timer { + return t +} + +func NewTimer(t gm.Timer) metrics4.Timer { + return &timer{t} +} diff --git a/metrics4/graphite/metrics.go b/metrics4/graphite/metrics.go new file mode 100644 index 000000000..36298d692 --- /dev/null +++ b/metrics4/graphite/metrics.go @@ -0,0 +1,30 @@ +package graphite + +import ( + "errors" + "fmt" + "github.com/cyberdelia/go-metrics-graphite" + "github.com/fabiolb/fabio/config" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/gm" + rcgm "github.com/rcrowley/go-metrics" + "net" + "time" +) + +func NewProvider(cfg config.Graphite, interval time.Duration, prefix string) (metrics4.Provider, error) { + if cfg.Addr == "" { + return nil, errors.New(" graphite addr missing") + } + + a, err := net.ResolveTCPAddr("tcp", cfg.Addr) + if err != nil { + return nil, fmt.Errorf(" cannot connect to Graphite: %s", err) + } + + registry := rcgm.NewRegistry() + + go graphite.Graphite(registry, interval, prefix, a) + + return gm.NewProvider(registry), nil +} \ No newline at end of file diff --git a/metrics4/graphite/metrics_test.go b/metrics4/graphite/metrics_test.go new file mode 100644 index 000000000..ce94e8659 --- /dev/null +++ b/metrics4/graphite/metrics_test.go @@ -0,0 +1,24 @@ +package graphite + +import ( + "github.com/fabiolb/fabio/config" + "testing" + "time" +) + +const addr = ":9876" + +// It shouldn't panic after creating several metrics with the same name +func TestIdenticalNamesForCounters(t *testing.T) { + metricName := "metric" + provider, err := NewProvider(config.Graphite{Interval: 1 * time.Second}) + + if err != nil { + t.Error(err) + } + + counter := provider.NewCounter(metricName) + counter.Add(1) + counter = provider.NewCounter(metricName) + counter.Add(1) +} diff --git a/metrics4/metrics.go b/metrics4/metrics.go new file mode 100644 index 000000000..26daee7ac --- /dev/null +++ b/metrics4/metrics.go @@ -0,0 +1,147 @@ +package metrics4 + +import ( + "github.com/go-kit/kit/metrics" + "io" +) + +const FabioNamespace = "fabio" + +type Counter = metrics.Counter + +type Gauge = metrics.Gauge + +type Timer = metrics.Histogram + +// Provider is an abstraction of a metrics backend. +type Provider interface { + // NewCounter creates a new counter object. + // labels - array of labels names + NewCounter(name string, labelsNames ... string) Counter + + // NewGauge creates a new gauge object. + NewGauge(name string, labelsNames ... string) Gauge + + // NewTimer creates a new timer object. + NewTimer(name string, labelsNames ... string) Timer + + // It extends Provider with Close method which closes a disposable objects that are connected with a provider. + io.Closer +} + +// MultiProvider wraps zero or more providers. +type MultiProvider struct { + p []Provider +} + +func NewMultiProvider(p []Provider) *MultiProvider { + return &MultiProvider{p} +} + +// NewCounter creates a MultiCounter with counter objects for all registered +// providers. +func (mp *MultiProvider) NewCounter(name string, labels ... string) Counter { + c := make([]Counter, len(mp.p)) + for i, p := range mp.p { + c[i] = p.NewCounter(name, labels...) + } + return &MultiCounter{c} +} + +// NewGauge creates a MultiGauge with gauge objects for all registered +// providers. +func (mp *MultiProvider) NewGauge(name string, labels ... string) Gauge { + g := make([]Gauge, len(mp.p)) + for i, p := range mp.p { + g[i] = p.NewGauge(name, labels...) + } + return &MultiGauge{g} +} + +// NewTimer creates a MultiTimer with timer objects for all registered +// providers. +func (mp *MultiProvider) NewTimer(name string, labels ... string) Timer { + t := make([]Timer, len(mp.p)) + for i, p := range mp.p { + t[i] = p.NewTimer(name, labels...) + } + return &MultiTimer{t} +} + +func (mp *MultiProvider) Close() error { + var errors []error + for _, p := range mp.p { + e := p.Close() + if e != nil { + errors = append(errors, e) + } + } + if len(errors) > 0 { + // TODO(max): Define MultiError + return errors[0] + } + return nil +} + +// MultiCounter wraps zero or more counters. +type MultiCounter struct { + counters []Counter +} + +func (mc *MultiCounter) Add(delta float64) { + for _, c := range mc.counters { + c.Add(delta) + } +} + +func (mc *MultiCounter) With(labelValues ... string) metrics.Counter { + labeledCounters := make([]Counter, len(mc.counters)) + for i, c := range mc.counters { + labeledCounters[i] = c.With(labelValues...) + } + return &MultiCounter{labeledCounters} +} + +// MultiGauge wraps zero or more gauges. +type MultiGauge struct { + gauges []Gauge +} + +func (mg *MultiGauge) Add(delta float64) { + for _, g := range mg.gauges { + g.Add(delta) + } +} + +func (mg *MultiGauge) Set(delta float64) { + for _, g := range mg.gauges { + g.Set(delta) + } +} + +func (mg *MultiGauge) With(labelValues ... string) metrics.Gauge { + labeledGauges := make([]Gauge, len(mg.gauges)) + for i, g := range mg.gauges { + labeledGauges[i] = g.With(labelValues...) + } + return &MultiGauge{labeledGauges} +} + +// MultiTimer wraps zero or more timers. +type MultiTimer struct { + timers []Timer +} + +func (mt *MultiTimer) Observe(duration float64) { + for _, t := range mt.timers { + t.Observe(duration) + } +} + +func (mt *MultiTimer) With(labelValues ... string) Timer { + labeledTimers := make([]Timer, len(mt.timers)) + for i, t := range mt.timers { + labeledTimers[i] = t.With(labelValues...) + } + return &MultiTimer{labeledTimers} +} diff --git a/metrics4/names/names.go b/metrics4/names/names.go new file mode 100644 index 000000000..730738348 --- /dev/null +++ b/metrics4/names/names.go @@ -0,0 +1,14 @@ +package names + +import "net/url" + +type Service struct { + Service string + Host string + Path string + TargetURL *url.URL +} + +func (s Service) String() string { + return s.Service +} diff --git a/metrics4/noop.go b/metrics4/noop.go new file mode 100644 index 000000000..212778ef5 --- /dev/null +++ b/metrics4/noop.go @@ -0,0 +1,37 @@ +package metrics4 + +import ( + "github.com/go-kit/kit/metrics" +) + +var noopCounter = NoopCounter{} + +type NoopCounter struct{} + +func (c *NoopCounter) Add(float64) {} + +func (c *NoopCounter) With(labels ... string) metrics.Counter { + return c +} + +var noopTimer = NoopTimer{} + +type NoopTimer struct{} + +func (t *NoopTimer) Observe(float64) {} + +func (t *NoopTimer) With(labels ... string) Timer { + return t +} + +var noopGauge = NoopGauge{} + +type NoopGauge struct{} + +func (g *NoopGauge) Add(float64) {} + +func (g *NoopGauge) Set(float64) {} + +func (g *NoopGauge) With(... string) metrics.Gauge { + return g +} diff --git a/metrics4/prefix/prefix.go b/metrics4/prefix/prefix.go new file mode 100644 index 000000000..1d4a4cf82 --- /dev/null +++ b/metrics4/prefix/prefix.go @@ -0,0 +1,74 @@ +package prefix + +import ( + "bytes" + "os" + "path/filepath" + "strings" + "sync" + "text/template" +) + +const DotDelimiter = "." + +const DefaultPrefix = "{{clean .Hostname}}.{{clean .Exec}}" + +var ( + prefix string + once sync.Once +) + +// clean creates safe prefix for graphite reporting by replacing +// some characters with underscores. +// TODO(fs): This may need updating for other metrics backends. +func clean(s string) string { + if s == "" { + return "_" + } + s = strings.Replace(s, ".", "_", -1) + s = strings.Replace(s, ":", "_", -1) + return strings.ToLower(s) +} + +func InitPrefix(tmpl string) { + once.Do(func() { + // Backward compatibility condition for old metrics.prefix parameter 'default' + if tmpl == "default" { + tmpl = DefaultPrefix + } + funcMap := template.FuncMap{ + "clean": clean, + } + t, err := template.New("prefix").Funcs(funcMap).Parse(tmpl) + if err != nil { + panic(err) + } + host, err := hostname() + if err != nil { + panic(err) + } + exe := filepath.Base(os.Args[0]) + + b := new(bytes.Buffer) + data := struct{ Hostname, Exec string }{host, exe} + if err := t.Execute(b, &data); err != nil { + panic(err) + } + prefix = b.String() + }) +} + + +func GetPrefix() string { + return prefix +} + +func GetPrefixedName(name string) string { + if len(prefix) == 0 { + return name + } + return prefix + "." + name +} + +// stubbed out for testing +var hostname = os.Hostname diff --git a/metrics4/prometheus/metrics.go b/metrics4/prometheus/metrics.go new file mode 100644 index 000000000..790c061c3 --- /dev/null +++ b/metrics4/prometheus/metrics.go @@ -0,0 +1,78 @@ +package prometheus + +import ( + "github.com/fabiolb/fabio/metrics4" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" + "strings" + "sync" +) + +type Provider struct { + counters map[string]metrics4.Counter + gauges map[string]metrics4.Gauge + timers map[string]metrics4.Timer + mutex sync.Mutex + prefix string +} + +func normalizeName(name string) string { + name = strings.Replace(name, ".", "_", -1) + name = strings.Replace(name, "-", "_", -1) + return name +} + +func NewProvider(prefix string) metrics4.Provider { + return &Provider{ + counters: make(map[string]metrics4.Counter), + gauges: make(map[string]metrics4.Gauge), + timers: make(map[string]metrics4.Timer), + prefix: prefix, + } +} + +func (p *Provider) NewCounter(name string, labels ... string) metrics4.Counter { + name = normalizeName(name) + p.mutex.Lock() + defer p.mutex.Unlock() + if p.counters[name] == nil { + p.counters[name] = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: normalizeName(p.prefix), + Name: name, + }, labels) + } + + return p.counters[name] +} + +func (p *Provider) NewGauge(name string, labels ... string) metrics4.Gauge { + name = normalizeName(name) + p.mutex.Lock() + defer p.mutex.Unlock() + if p.gauges[name] == nil { + p.gauges[name] = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: normalizeName(p.prefix), + Name: name, + }, labels) + } + + return p.gauges[name] +} + +func (p *Provider) NewTimer(name string, labels ... string) metrics4.Timer { + name = normalizeName(name) + p.mutex.Lock() + defer p.mutex.Unlock() + if p.timers[name] == nil { + p.timers[name] = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: normalizeName(p.prefix), + Name: name, + }, labels) + } + + return p.timers[name] +} + +func (p *Provider) Close() error { + return nil +} diff --git a/metrics4/statsd/metrics.go b/metrics4/statsd/metrics.go new file mode 100644 index 000000000..3a706df93 --- /dev/null +++ b/metrics4/statsd/metrics.go @@ -0,0 +1,57 @@ +package statsd + +import ( + "github.com/fabiolb/fabio/config" + "github.com/fabiolb/fabio/metrics4/untagged" + "github.com/go-kit/kit/log" + "time" + + "github.com/fabiolb/fabio/metrics4" + "github.com/go-kit/kit/metrics/statsd" +) + +type Provider struct { + client *statsd.Statsd + ticker *time.Ticker + sampleRate float64 +} + +func NewProvider(cfg config.StatsD, interval time.Duration, prefix string) metrics4.Provider { + if len(prefix) != 0 { + prefix = prefix + "." + } + + client := statsd.New(prefix, log.NewNopLogger()) + + ticker := time.NewTicker(interval) + + go client.SendLoop(ticker.C, "udp", cfg.Addr) + + return &Provider{client, ticker, cfg.SampleRate} +} + +func (p *Provider) NewCounter(name string, labelsNames ...string) metrics4.Counter { + if len(labelsNames) == 0 { + return p.client.NewCounter(name, p.sampleRate) + } + return untagged.NewCounter(p, name, labelsNames) +} + +func (p *Provider) NewGauge(name string, labelsNames ...string) metrics4.Gauge { + if len(labelsNames) == 0 { + return p.client.NewGauge(name) + } + return untagged.NewGauge(p, name, labelsNames) +} + +func (p *Provider) NewTimer(name string, labelsNames ...string) metrics4.Timer { + if len(labelsNames) == 0 { + return p.client.NewTiming(name, p.sampleRate) + } + return untagged.NewTimer(p, name, labelsNames) +} + +func (p *Provider) Close() error { + p.ticker.Stop() + return nil +} diff --git a/metrics4/statsd/metrics_test.go b/metrics4/statsd/metrics_test.go new file mode 100644 index 000000000..5bcf90aa2 --- /dev/null +++ b/metrics4/statsd/metrics_test.go @@ -0,0 +1,118 @@ +package statsd + +import ( + "github.com/fabiolb/fabio/metrics4" + "net" + "testing" + "time" +) + +const addr = ":9876" + +// It shouldn't panic after creating several metrics with the same name +func TestIdenticalNamesForCounters(t *testing.T) { + metricName := "metric" + provider, err := NewProvider("addr", 5 * time.Second) + + if err != nil { + t.Error(err) + } + + counter := provider.NewCounter(metricName) + counter.Add(1) + counter = provider.NewCounter(metricName) + counter.Add(1) +} + +func createProvider(t *testing.T, addr string, interval time.Duration) metrics4.Provider { + provider, err := NewProvider(addr, time.Second) + if err != nil { + t.Fatal(err) + } + return provider +} + +func createUdpConnection(t *testing.T, addr string) net.PacketConn { + conn, err := net.ListenPacket("udp", addr) + if err != nil { + t.Fatal(err) + } + return conn +} + +func readStringFromUdpConnection(t *testing.T, conn net.PacketConn, length int) string { + buffer := make([]byte, length) + read, _, err := conn.ReadFrom(buffer) + if err != nil { + t.Fatal(err) + } + return string(buffer[:read]) +} + +func startTimeout(t *testing.T, duration time.Duration) { + go func() { + timer := time.NewTimer(duration) + <-timer.C + t.Fatal("timeout") + }() +} + +func TestLabeledCounters(t *testing.T) { + counterMessage := "fabio_counter_code_200:1.000000|c\n" + + provider := createProvider(t, addr, time.Second) + defer provider.Close() + + conn := createUdpConnection(t, addr) + defer conn.Close() + + startTimeout(t, 5 * time.Second) + + provider.NewCounter("counter", "code").With("code", "200").Add(1) + + message := readStringFromUdpConnection(t, conn, len(counterMessage)) + + if message != counterMessage { + t.Fatalf("Unexpected message:\nGot:\t%s\nExpected:\t%s\n", message, counterMessage) + } +} + +func TestLabeledTimers(t *testing.T) { + timerMessage := "fabio_timer_code_200:0.500000|ms" + + provider := createProvider(t, addr, time.Second) + defer provider.Close() + + conn := createUdpConnection(t, addr) + defer conn.Close() + + startTimeout(t, 5 * time.Second) + + provider.NewTimer("timer", "code").With("code", "200").Observe(0.5) + + message := readStringFromUdpConnection(t, conn, len(timerMessage)) + + if message != timerMessage { + t.Fatalf("Unexpected message:\nGot:\t%s\nExpected:\t%s\n", message, timerMessage) + } +} + +func TestLabeledGauges(t *testing.T) { + gaugeMessage := "fabio_gauge_code_200:5.000000|g" + + provider := createProvider(t, addr, time.Second) + defer provider.Close() + + conn := createUdpConnection(t, addr) + defer conn.Close() + + startTimeout(t, 5 * time.Second) + + provider.NewGauge("gauge", "code").With("code", "200").Add(5) + + message := readStringFromUdpConnection(t, conn, len(gaugeMessage)) + + if message != gaugeMessage { + t.Fatalf("Unexpected message:\nGot:\t%s\nExpected:\t%s\n", message, gaugeMessage) + } +} diff --git a/metrics4/stdout/metrics.go b/metrics4/stdout/metrics.go new file mode 100644 index 000000000..d5c64b3e6 --- /dev/null +++ b/metrics4/stdout/metrics.go @@ -0,0 +1,20 @@ +package stdout + +import ( + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/gm" + rcgm "github.com/rcrowley/go-metrics" + "log" + "os" + "time" +) + +func NewProvider(interval time.Duration) metrics4.Provider { + logger := log.New(os.Stdout, "localhost: ", log.Lmicroseconds) + + r := rcgm.NewRegistry() + + go rcgm.Log(r, interval, logger) + + return gm.NewProvider(r) +} diff --git a/metrics4/untagged/untagged.go b/metrics4/untagged/untagged.go new file mode 100644 index 000000000..d4bd08222 --- /dev/null +++ b/metrics4/untagged/untagged.go @@ -0,0 +1,98 @@ +package untagged + +import ( + "errors" + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/prefix" + "strings" +) + +// This module provides Counter, Gauge, Timer for metric tools which don't support tags (labels) + +func parseLabelsValues(labelsNames []string, labels []string) ([]string, error) { + labelsCount := len(labelsNames) + labelsValues := make([]string, labelsCount) + + for i := 0; i < labelsCount; i++ { + if labelsNames[i] != labels[(i * 2)] { + return nil, errors.New("incorrect label name") + } + + labelsValues[i] = labels[(i * 2) + 1] + } + + return labelsValues, nil +} + +func makeNameFromLabels(labelsNames []string, labels []string) string { + _, err := parseLabelsValues(labelsNames, labels) + if err != nil { + panic(err) + } + return strings.Join(labels, prefix.DotDelimiter) +} + +type metric struct { + p metrics4.Provider + name string + labelsNames []string +} + +func newMetric(p metrics4.Provider, name string, labelsNames []string) *metric { + return &metric{ + p, + name, + labelsNames, + } +} + +type counter struct { + m *metric +} + +func (c *counter) Add(delta float64) {} + +func (c *counter) With(labels ... string) metrics4.Counter { + return c.m.p.NewCounter(c.m.name + prefix.DotDelimiter + makeNameFromLabels(c.m.labelsNames, labels)) +} + +func NewCounter(p metrics4.Provider, name string, labelsNames []string) metrics4.Counter { + return &counter{ + newMetric(p, name, labelsNames), + } +} + +type timer struct { + m *metric +} + +func (t *timer) Observe(value float64) {} + +func (t *timer) With(labels ... string) metrics4.Timer { + return t.m.p.NewTimer(t.m.name + prefix.DotDelimiter + makeNameFromLabels(t.m.labelsNames, labels)) +} + +func NewTimer(p metrics4.Provider, name string, labelsNames []string) metrics4.Timer { + return &timer{ + newMetric(p, name, labelsNames), + } +} + +type gauge struct { + m *metric +} + +func (g *gauge) Add(value float64) {} + +func (g *gauge) Set(value float64) {} + +func (g *gauge) With(labels ... string) metrics4.Gauge { + return g.m.p.NewGauge(g.m.name + prefix.DotDelimiter + makeNameFromLabels(g.m.labelsNames, labels)) +} + +func NewGauge(p metrics4.Provider, name string, labelsNames []string) metrics4.Gauge { + return &gauge{ + newMetric(p, name, labelsNames), + } +} + diff --git a/metrics4/untagged/untagged_test.go b/metrics4/untagged/untagged_test.go new file mode 100644 index 000000000..baf474875 --- /dev/null +++ b/metrics4/untagged/untagged_test.go @@ -0,0 +1,23 @@ +package untagged + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestParseLabelsValues(t *testing.T) { + labelsNames := []string{ "a", "b" } + labels := []string { "a", "1", "b", "2" } + labelsValues, err := parseLabelsValues(labelsNames, labels) + if err != nil { + t.Fatal(err) + } + assert.EqualValues(t, []string{ "1", "2" }, labelsValues) +} + +func TestMakeNameFromLabels(t *testing.T) { + labelsNames := []string{ "a", "b" } + labels := []string { "a", "1", "b", "2" } + name := makeNameFromLabels(labelsNames, labels) + assert.EqualValues(t, "a_1_b_2", name) +} diff --git a/proxy/http_proxy.go b/proxy/http_proxy.go index b531373af..6c00f9598 100644 --- a/proxy/http_proxy.go +++ b/proxy/http_proxy.go @@ -14,7 +14,7 @@ import ( "github.com/fabiolb/fabio/config" "github.com/fabiolb/fabio/logger" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/noroute" "github.com/fabiolb/fabio/proxy/gzip" "github.com/fabiolb/fabio/route" @@ -44,11 +44,17 @@ type HTTPProxy struct { Lookup func(*http.Request) *route.Target // Requests is a timer metric which is updated for every request. - Requests metrics.Timer + Requests metrics4.Timer // Noroute is a counter metric which is updated for every request // where Lookup() returns nil. - Noroute metrics.Counter + Noroute metrics4.Counter + + // WSConn counts the number of open web socket connections. + WSConn metrics4.Gauge + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider // Logger is the access logger for the requests. Logger logger.Logger @@ -63,6 +69,11 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { panic("no lookup function") } + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Config.RequestID != "" { id := p.UUID if id == nil { @@ -101,10 +112,7 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { if t.RedirectCode != 0 { http.Redirect(w, r, t.RedirectURL.String(), t.RedirectCode) - if t.Timer != nil { - t.Timer.Update(0) - } - metrics.DefaultRegistry.GetTimer(key(t.RedirectCode)).Update(0) + metrics.NewCounter("http.status").With("code", strconv.Itoa(t.RedirectCode)).Add(1) return } @@ -155,13 +163,13 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case upgrade == "websocket" || upgrade == "Websocket": r.URL = targetURL + dial := net.Dial if targetURL.Scheme == "https" || targetURL.Scheme == "wss" { - h = newRawProxy(targetURL.Host, func(network, address string) (net.Conn, error) { + dial = func(network, address string) (net.Conn, error) { return tls.Dial(network, address, tr.(*http.Transport).TLSClientConfig) - }) - } else { - h = newRawProxy(targetURL.Host, net.Dial) + } } + h = newRawProxy(targetURL.Host, dial, p.WSConn) case accept == "text/event-stream": // use the flush interval for SSE (server-sent events) @@ -188,16 +196,13 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { dur := end.Sub(start) if p.Requests != nil { - p.Requests.Update(dur) - } - if t.Timer != nil { - t.Timer.Update(dur) + p.Requests.Observe(float64(dur.Nanoseconds()) / float64(time.Millisecond)) } if rw.code <= 0 { return } - metrics.DefaultRegistry.GetTimer(key(rw.code)).Update(dur) + metrics.NewTimer("http.status", "code").With("code", strconv.Itoa(rw.code)).Observe(float64(dur.Nanoseconds()) / float64(time.Millisecond)) // write access log if p.Logger != nil { diff --git a/proxy/http_raw_handler.go b/proxy/http_raw_handler.go index a2cf52e30..558c411a2 100644 --- a/proxy/http_raw_handler.go +++ b/proxy/http_raw_handler.go @@ -1,26 +1,30 @@ package proxy import ( + "github.com/fabiolb/fabio/metrics4" "io" "log" "net" "net/http" - - "github.com/fabiolb/fabio/metrics" + "sync/atomic" ) -// conn measures the number of open web socket connections -var conn = metrics.DefaultRegistry.GetCounter("ws.conn") +// conns keeps track of the number of open ws connections +var conns int64 type dialFunc func(network, address string) (net.Conn, error) // newRawProxy returns an HTTP handler which forwards data between // an incoming and outgoing TCP connection including the original request. // This handler establishes a new outgoing connection per request. -func newRawProxy(host string, dial dialFunc) http.Handler { +func newRawProxy(host string, dial dialFunc, conn metrics4.Gauge) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn.Inc(1) - defer func() { conn.Inc(-1) }() + if conn != nil { + conn.Set(float64(atomic.AddInt64(&conns, 1))) + defer func() { + conn.Set(float64(atomic.AddInt64(&conns, -1))) + }() + } hj, ok := w.(http.Hijacker) if !ok { diff --git a/proxy/tcp/copy_buffer.go b/proxy/tcp/copy_buffer.go index c06c50fd1..cfba3c1ce 100644 --- a/proxy/tcp/copy_buffer.go +++ b/proxy/tcp/copy_buffer.go @@ -3,12 +3,12 @@ package tcp import ( "io" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" ) // copyBuffer is an adapted version of io.copyBuffer which updates a // counter instead of returning the total bytes written. -func copyBuffer(dst io.Writer, src io.Reader, c metrics.Counter) (err error) { +func copyBuffer(dst io.Writer, src io.Reader, c metrics4.Counter) (err error) { buf := make([]byte, 32*1024) for { nr, er := src.Read(buf) @@ -16,7 +16,7 @@ func copyBuffer(dst io.Writer, src io.Reader, c metrics.Counter) (err error) { nw, ew := dst.Write(buf[0:nr]) if nw > 0 { if c != nil { - c.Inc(int64(nw)) + c.Add(float64(nw)) } } if ew != nil { diff --git a/proxy/tcp/sni_proxy.go b/proxy/tcp/sni_proxy.go index d9920876c..20bee7e19 100644 --- a/proxy/tcp/sni_proxy.go +++ b/proxy/tcp/sni_proxy.go @@ -7,7 +7,7 @@ import ( "net" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" ) @@ -26,20 +26,28 @@ type SNIProxy struct { Lookup func(host string) *route.Target // Conn counts the number of connections. - Conn metrics.Counter + Conn metrics4.Counter // ConnFail counts the failed upstream connection attempts. - ConnFail metrics.Counter + ConnFail metrics4.Counter // Noroute counts the failed Lookup() calls. - Noroute metrics.Counter + Noroute metrics4.Counter + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider } func (p *SNIProxy) ServeTCP(in net.Conn) error { defer in.Close() + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Conn != nil { - p.Conn.Inc(1) + p.Conn.Add(1) } tlsReader := bufio.NewReader(in) @@ -47,7 +55,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[DEBUG] tcp+sni: TLS handshake failed (failed to peek data)") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Add(1) } return err } @@ -56,7 +64,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Printf("[DEBUG] tcp+sni: TLS handshake failed (%s)", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Add(1) } return err } @@ -66,7 +74,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Printf("[DEBUG] tcp+sni: TLS handshake failed (%s)", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Add(1) } return err } @@ -77,7 +85,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if !ok { log.Print("[DEBUG] tcp+sni: TLS handshake failed (unable to parse client hello)") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Add(1) } return nil } @@ -85,7 +93,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if host == "" { log.Print("[DEBUG] tcp+sni: server_name missing") if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Add(1) } return nil } @@ -93,7 +101,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { t := p.Lookup(host) if t == nil { if p.Noroute != nil { - p.Noroute.Inc(1) + p.Noroute.Add(1) } return nil } @@ -107,7 +115,7 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp+sni: cannot connect to upstream ", addr) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Add(1) } return err } @@ -118,23 +126,23 @@ func (p *SNIProxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp+sni: copy client hello failed. ", err) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Add(1) } return err } errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + cp := func(dst io.Writer, src io.Reader, c metrics4.Counter) { errc <- copyBuffer(dst, src, c) } // rx measures the traffic to the upstream server (in <- out) // tx measures the traffic from the upstream server (out <- in) - rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") - tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + rx := metrics.NewCounter(t.TimerName.String() + ".rx") + tx := metrics.NewCounter(t.TimerName.String() + ".tx") // we've received the ClientHello already - rx.Inc(int64(n)) + rx.Add(float64(n)) go cp(in, out, rx) go cp(out, in, tx) diff --git a/proxy/tcp/tcp_proxy.go b/proxy/tcp/tcp_proxy.go index 17bf69572..785c1bbe8 100644 --- a/proxy/tcp/tcp_proxy.go +++ b/proxy/tcp/tcp_proxy.go @@ -6,7 +6,7 @@ import ( "net" "time" - "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/metrics4" "github.com/fabiolb/fabio/route" ) @@ -21,20 +21,28 @@ type Proxy struct { Lookup func(host string) *route.Target // Conn counts the number of connections. - Conn metrics.Counter + Conn metrics4.Counter // ConnFail counts the failed upstream connection attempts. - ConnFail metrics.Counter + ConnFail metrics4.Counter // Noroute counts the failed Lookup() calls. - Noroute metrics.Counter + Noroute metrics4.Counter + + // Metrics is the configured metrics backend provider. + Metrics metrics4.Provider } func (p *Proxy) ServeTCP(in net.Conn) error { defer in.Close() + metrics := p.Metrics + if metrics == nil { + metrics = &metrics4.MultiProvider{} + } + if p.Conn != nil { - p.Conn.Inc(1) + p.Conn.Add(1) } _, port, _ := net.SplitHostPort(in.LocalAddr().String()) @@ -42,7 +50,7 @@ func (p *Proxy) ServeTCP(in net.Conn) error { t := p.Lookup(port) if t == nil { if p.Noroute != nil { - p.Noroute.Inc(1) + p.Noroute.Add(1) } return nil } @@ -56,21 +64,21 @@ func (p *Proxy) ServeTCP(in net.Conn) error { if err != nil { log.Print("[WARN] tcp: cannot connect to upstream ", addr) if p.ConnFail != nil { - p.ConnFail.Inc(1) + p.ConnFail.Add(1) } return err } defer out.Close() errc := make(chan error, 2) - cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + cp := func(dst io.Writer, src io.Reader, c metrics4.Counter) { errc <- copyBuffer(dst, src, c) } // rx measures the traffic to the upstream server (in <- out) // tx measures the traffic from the upstream server (out <- in) - rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") - tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + rx := metrics.NewCounter(t.TimerName.String() + ".rx") + tx := metrics.NewCounter(t.TimerName.String() + ".tx") go cp(in, out, rx) go cp(out, in, tx) diff --git a/route/route.go b/route/route.go index 0f9e35dc9..7a59958a8 100644 --- a/route/route.go +++ b/route/route.go @@ -2,6 +2,7 @@ package route import ( "fmt" + "github.com/fabiolb/fabio/metrics4/names" "log" "net/url" "reflect" @@ -9,7 +10,6 @@ import ( "strconv" "strings" - "github.com/fabiolb/fabio/metrics" "github.com/gobwas/glob" ) @@ -54,22 +54,21 @@ func (r *Route) addTarget(service string, targetURL *url.URL, fixedWeight float6 } } - name, err := metrics.TargetName(service, r.Host, r.Path, targetURL) - if err != nil { - log.Printf("[ERROR] Invalid metrics name: %s", err) - name = "unknown" - } - t := &Target{ Service: service, Tags: tags, Opts: opts, URL: targetURL, FixedWeight: fixedWeight, - Timer: ServiceRegistry.GetTimer(name), - TimerName: name, + TimerName: names.Service{ + Service: service, + Host: r.Host, + Path: r.Path, + TargetURL: targetURL, + }, } + var err error if opts != nil { t.StripPath = opts["strip"] t.TLSSkipVerify = opts["tlsskipverify"] == "true" diff --git a/route/table.go b/route/table.go index c417e2c34..3da5a219f 100644 --- a/route/table.go +++ b/route/table.go @@ -9,10 +9,8 @@ import ( "net/url" "sort" "strings" - "sync" "sync/atomic" - "github.com/fabiolb/fabio/metrics" "github.com/gobwas/glob" ) @@ -23,9 +21,6 @@ var errNoMatch = errors.New("route: no target match") // table stores the active routing table. Must never be nil. var table atomic.Value -// ServiceRegistry stores the metrics for the services. -var ServiceRegistry metrics.Registry = metrics.NoopRegistry{} - // init initializes the routing table. func init() { table.Store(make(Table)) @@ -38,9 +33,6 @@ func GetTable() Table { return table.Load().(Table) } -// mu guards table and registry in SetTable. -var mu sync.Mutex - // SetTable sets the active routing table. A nil value // logs a warning and is ignored. The function is safe // to be called from multiple goroutines. @@ -49,42 +41,7 @@ func SetTable(t Table) { log.Print("[WARN] Ignoring nil routing table") return } - mu.Lock() table.Store(t) - syncRegistry(t) - mu.Unlock() -} - -// syncRegistry unregisters all inactive timers. -// It assumes that all timers of the table have -// already been registered. -func syncRegistry(t Table) { - timers := map[string]bool{} - - // get all registered timers - for _, name := range ServiceRegistry.Names() { - timers[name] = false - } - - // mark the ones from this table as active. - // this can also add new entries but we do not - // really care since we are only interested in the - // inactive ones. - for _, routes := range t { - for _, r := range routes { - for _, tg := range r.Targets { - timers[tg.TimerName] = true - } - } - } - - // unregister inactive timers - for name, active := range timers { - if !active { - ServiceRegistry.Unregister(name) - log.Printf("[INFO] Unregistered timer %s", name) - } - } } // Table contains a set of routes grouped by host. diff --git a/route/table_registry_test.go b/route/table_registry_test.go index 3616dd9c5..5607fe3fe 100644 --- a/route/table_registry_test.go +++ b/route/table_registry_test.go @@ -1,63 +1,55 @@ package route -import ( - "reflect" - "sort" - "testing" - - "github.com/fabiolb/fabio/metrics" -) - -func TestSyncRegistry(t *testing.T) { - oldRegistry := ServiceRegistry - ServiceRegistry = newStubRegistry() - defer func() { ServiceRegistry = oldRegistry }() - - tbl := make(Table) - tbl.addRoute(&RouteDef{Service: "svc-a", Src: "/aaa", Dst: "http://localhost:1234", Weight: 1}) - tbl.addRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678", Weight: 1}) - if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234", "svc-b._./bbb.localhost_5678"}; !reflect.DeepEqual(got, want) { - t.Fatalf("got %v want %v", got, want) - } - - tbl.delRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678"}) - syncRegistry(tbl) - if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234"}; !reflect.DeepEqual(got, want) { - t.Fatalf("got %v want %v", got, want) - } -} - -func newStubRegistry() metrics.Registry { - return &stubRegistry{names: make(map[string]bool)} -} - -type stubRegistry struct { - names map[string]bool -} - -func (p *stubRegistry) Names() []string { - n := []string{} - for k := range p.names { - n = append(n, k) - } - sort.Strings(n) - return n -} - -func (p *stubRegistry) Unregister(name string) { - delete(p.names, name) -} - -func (p *stubRegistry) UnregisterAll() { - p.names = map[string]bool{} -} - -func (p *stubRegistry) GetCounter(name string) metrics.Counter { - p.names[name] = true - return metrics.NoopCounter{} -} - -func (p *stubRegistry) GetTimer(name string) metrics.Timer { - p.names[name] = true - return metrics.NoopTimer{} -} +// func TestSyncRegistry(t *testing.T) { +// oldRegistry := ServiceRegistry +// ServiceRegistry = newStubRegistry() +// defer func() { ServiceRegistry = oldRegistry }() +// +// tbl := make(Table) +// tbl.addRoute(&RouteDef{Service: "svc-a", Src: "/aaa", Dst: "http://localhost:1234", Weight: 1}) +// tbl.addRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678", Weight: 1}) +// if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234", "svc-b._./bbb.localhost_5678"}; !reflect.DeepEqual(got, want) { +// t.Fatalf("got %v want %v", got, want) +// } +// +// tbl.delRoute(&RouteDef{Service: "svc-b", Src: "/bbb", Dst: "http://localhost:5678"}) +// syncRegistry(tbl) +// if got, want := ServiceRegistry.Names(), []string{"svc-a._./aaa.localhost_1234"}; !reflect.DeepEqual(got, want) { +// t.Fatalf("got %v want %v", got, want) +// } +// } +// +// func newStubRegistry() metrics.Registry { +// return &stubRegistry{names: make(map[string]bool)} +// } +// +// type stubRegistry struct { +// names map[string]bool +// } +// +// func (p *stubRegistry) Names() []string { +// n := []string{} +// for k := range p.names { +// n = append(n, k) +// } +// sort.Strings(n) +// return n +// } +// +// func (p *stubRegistry) Unregister(name string) { +// delete(p.names, name) +// } +// +// func (p *stubRegistry) UnregisterAll() { +// p.names = map[string]bool{} +// } +// +// func (p *stubRegistry) GetCounter(name string) metrics.Counter { +// p.names[name] = true +// return metrics.NoopCounter{} +// } +// +// func (p *stubRegistry) GetTimer(name string) metrics.Timer { +// p.names[name] = true +// return metrics.NoopTimer{} +// } diff --git a/route/target.go b/route/target.go index db9e5244b..b5422893a 100644 --- a/route/target.go +++ b/route/target.go @@ -1,10 +1,10 @@ package route import ( + "github.com/fabiolb/fabio/metrics4" + "github.com/fabiolb/fabio/metrics4/names" "net/url" "strings" - - "github.com/fabiolb/fabio/metrics" ) type Target struct { @@ -49,11 +49,12 @@ type Target struct { // Weight is the actual weight for this service in percent. Weight float64 + // TODO(max): Isn't it the same as Requests timer in HTTPProxy // Timer measures throughput and latency of this target - Timer metrics.Timer + Timer metrics4.Timer // TimerName is the name of the timer in the metrics registry - TimerName string + TimerName names.Service // accessRules is map of access information for the target. accessRules map[string][]interface{} diff --git a/vendor/github.com/alexcesaro/statsd/CHANGELOG.md b/vendor/github.com/alexcesaro/statsd/CHANGELOG.md new file mode 100644 index 000000000..04d811b71 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/CHANGELOG.md @@ -0,0 +1,64 @@ +# Change Log +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](http://semver.org/). + +## [2.0.0] - 2016-03-20 + +- `New` signature changed. The default address used is now ":8125". To use + another address use the `Address` option: + + Before: + ``` + statsd.New(":8125") + statsd.New(":9000") + ``` + + After + ``` + statsd.New() + statsd.New(statsd.Address(":9000")) + ``` + +- The `rate` parameter has been removed from the `Count` and `Timing` methods. + Use the new `SampleRate` option instead. + +- `Count`, `Gauge` and `Timing` now accept a `interface{}` instead of an int as + the value parameter. So you can now use any type of integer or float in these + functions. + +- The `WithInfluxDBTags` and `WithDatadogTags` options were replaced by the + `TagsFormat` and `Tags` options: + + Before: + ``` + statsd.New(statsd.WithInfluxDBTags("tag", "value")) + statsd.New(statsd.WithDatadogTags("tag", "value")) + ``` + + After + ``` + statsd.New(statsd.TagsFormat(statsd.InfluxDB), statsd.Tags("tag", "value")) + statsd.New(statsd.TagsFormat(statsd.Datadog), statsd.Tags("tag", "value")) + ``` + +- All options whose named began by `With` had the `With` stripped: + + Before: + ``` + statsd.New(statsd.WithMaxPacketSize(65000)) + ``` + + After + ``` + statsd.New(statsd.MaxPacketSize(65000)) + ``` + +- `ChangeGauge` has been removed as it is a bad practice: UDP packets can be + lost so using relative changes can cause unreliable values in the long term. + Use `Gauge` instead which sends an absolute value. + +- The `Histogram` method has been added. + +- The `Clone` method was added to the `Client`, it allows to create a new + `Client` with different rate / prefix / tags parameters while still using the + same connection. diff --git a/vendor/github.com/alexcesaro/statsd/LICENSE b/vendor/github.com/alexcesaro/statsd/LICENSE new file mode 100644 index 000000000..4ec7268d5 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2015 Alexandre Cesaro + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/alexcesaro/statsd/README.md b/vendor/github.com/alexcesaro/statsd/README.md new file mode 100644 index 000000000..774a1c687 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/README.md @@ -0,0 +1,50 @@ +# statsd +[![Build Status](https://travis-ci.org/alexcesaro/statsd.svg?branch=v2)](https://travis-ci.org/alexcesaro/statsd) [![Code Coverage](http://gocover.io/_badge/gopkg.in/alexcesaro/statsd.v2)](http://gocover.io/gopkg.in/alexcesaro/statsd.v2) [![Documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2?status.svg)](https://godoc.org/gopkg.in/alexcesaro/statsd.v2) + +## Introduction + +statsd is a simple and efficient [Statsd](https://github.com/etsy/statsd) +client. + +See the [benchmark](https://github.com/alexcesaro/statsdbench) for a comparison +with other Go StatsD clients. + +## Features + +- Supports all StatsD metrics: counter, gauge, timing and set +- Supports InfluxDB and Datadog tags +- Fast and GC-friendly: all functions for sending metrics do not allocate +- Efficient: metrics are buffered by default +- Simple and clean API +- 100% test coverage +- Versioned API using gopkg.in + + +## Documentation + +https://godoc.org/gopkg.in/alexcesaro/statsd.v2 + + +## Download + + go get gopkg.in/alexcesaro/statsd.v2 + + +## Example + +See the [examples in the documentation](https://godoc.org/gopkg.in/alexcesaro/statsd.v2#example-package). + + +## License + +[MIT](LICENSE) + + +## Contribute + +Do you have any question the documentation does not answer? Is there a use case +that you feel is common and is not well-addressed by the current API? + +If so you are more than welcome to ask questions in the +[thread on golang-nuts](https://groups.google.com/d/topic/golang-nuts/Tz6t4_iLgnw/discussion) +or open an issue or send a pull-request here on Github. diff --git a/vendor/github.com/alexcesaro/statsd/conn.go b/vendor/github.com/alexcesaro/statsd/conn.go new file mode 100644 index 000000000..4dbda6309 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/conn.go @@ -0,0 +1,270 @@ +package statsd + +import ( + "io" + "math/rand" + "net" + "strconv" + "sync" + "time" +) + +type conn struct { + // Fields settable with options at Client's creation. + addr string + errorHandler func(error) + flushPeriod time.Duration + maxPacketSize int + network string + tagFormat TagFormat + + mu sync.Mutex + // Fields guarded by the mutex. + closed bool + w io.WriteCloser + buf []byte + rateCache map[float32]string +} + +func newConn(conf connConfig, muted bool) (*conn, error) { + c := &conn{ + addr: conf.Addr, + errorHandler: conf.ErrorHandler, + flushPeriod: conf.FlushPeriod, + maxPacketSize: conf.MaxPacketSize, + network: conf.Network, + tagFormat: conf.TagFormat, + } + + if muted { + return c, nil + } + + var err error + c.w, err = dialTimeout(c.network, c.addr, 5*time.Second) + if err != nil { + return c, err + } + // When using UDP do a quick check to see if something is listening on the + // given port to return an error as soon as possible. + if c.network[:3] == "udp" { + for i := 0; i < 2; i++ { + _, err = c.w.Write(nil) + if err != nil { + _ = c.w.Close() + c.w = nil + return c, err + } + } + } + + // To prevent a buffer overflow add some capacity to the buffer to allow for + // an additional metric. + c.buf = make([]byte, 0, c.maxPacketSize+200) + + if c.flushPeriod > 0 { + go func() { + ticker := time.NewTicker(c.flushPeriod) + for _ = range ticker.C { + c.mu.Lock() + if c.closed { + ticker.Stop() + c.mu.Unlock() + return + } + c.flush(0) + c.mu.Unlock() + } + }() + } + + return c, nil +} + +func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate float32, tags string) { + c.mu.Lock() + l := len(c.buf) + c.appendBucket(prefix, bucket, tags) + c.appendNumber(n) + c.appendType(typ) + c.appendRate(rate) + c.closeMetric(tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) { + c.mu.Lock() + l := len(c.buf) + // To set a gauge to a negative value we must first set it to 0. + // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges + if isNegative(value) { + c.appendBucket(prefix, bucket, tags) + c.appendGauge(0, tags) + } + c.appendBucket(prefix, bucket, tags) + c.appendGauge(value, tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) appendGauge(value interface{}, tags string) { + c.appendNumber(value) + c.appendType("g") + c.closeMetric(tags) +} + +func (c *conn) unique(prefix, bucket string, value string, tags string) { + c.mu.Lock() + l := len(c.buf) + c.appendBucket(prefix, bucket, tags) + c.appendString(value) + c.appendType("s") + c.closeMetric(tags) + c.flushIfBufferFull(l) + c.mu.Unlock() +} + +func (c *conn) appendByte(b byte) { + c.buf = append(c.buf, b) +} + +func (c *conn) appendString(s string) { + c.buf = append(c.buf, s...) +} + +func (c *conn) appendNumber(v interface{}) { + switch n := v.(type) { + case int: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int64: + c.buf = strconv.AppendInt(c.buf, n, 10) + case uint64: + c.buf = strconv.AppendUint(c.buf, n, 10) + case int32: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint32: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int16: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint16: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case int8: + c.buf = strconv.AppendInt(c.buf, int64(n), 10) + case uint8: + c.buf = strconv.AppendUint(c.buf, uint64(n), 10) + case float64: + c.buf = strconv.AppendFloat(c.buf, n, 'f', -1, 64) + case float32: + c.buf = strconv.AppendFloat(c.buf, float64(n), 'f', -1, 32) + } +} + +func isNegative(v interface{}) bool { + switch n := v.(type) { + case int: + return n < 0 + case uint: + return n < 0 + case int64: + return n < 0 + case uint64: + return n < 0 + case int32: + return n < 0 + case uint32: + return n < 0 + case int16: + return n < 0 + case uint16: + return n < 0 + case int8: + return n < 0 + case uint8: + return n < 0 + case float64: + return n < 0 + case float32: + return n < 0 + } + return false +} + +func (c *conn) appendBucket(prefix, bucket string, tags string) { + c.appendString(prefix) + c.appendString(bucket) + if c.tagFormat == InfluxDB { + c.appendString(tags) + } + c.appendByte(':') +} + +func (c *conn) appendType(t string) { + c.appendByte('|') + c.appendString(t) +} + +func (c *conn) appendRate(rate float32) { + if rate == 1 { + return + } + if c.rateCache == nil { + c.rateCache = make(map[float32]string) + } + + c.appendString("|@") + if s, ok := c.rateCache[rate]; ok { + c.appendString(s) + } else { + s = strconv.FormatFloat(float64(rate), 'f', -1, 32) + c.rateCache[rate] = s + c.appendString(s) + } +} + +func (c *conn) closeMetric(tags string) { + if c.tagFormat == Datadog { + c.appendString(tags) + } + c.appendByte('\n') +} + +func (c *conn) flushIfBufferFull(lastSafeLen int) { + if len(c.buf) > c.maxPacketSize { + c.flush(lastSafeLen) + } +} + +// flush flushes the first n bytes of the buffer. +// If n is 0, the whole buffer is flushed. +func (c *conn) flush(n int) { + if len(c.buf) == 0 { + return + } + if n == 0 { + n = len(c.buf) + } + + // Trim the last \n, StatsD does not like it. + _, err := c.w.Write(c.buf[:n-1]) + c.handleError(err) + if n < len(c.buf) { + copy(c.buf, c.buf[n:]) + } + c.buf = c.buf[:len(c.buf)-n] +} + +func (c *conn) handleError(err error) { + if err != nil && c.errorHandler != nil { + c.errorHandler(err) + } +} + +// Stubbed out for testing. +var ( + dialTimeout = net.DialTimeout + now = time.Now + randFloat = rand.Float32 +) diff --git a/vendor/github.com/alexcesaro/statsd/doc.go b/vendor/github.com/alexcesaro/statsd/doc.go new file mode 100644 index 000000000..bb7b986e3 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/doc.go @@ -0,0 +1,29 @@ +/* +Package statsd is a simple and efficient StatsD client. + + +Options + +Use options to configure the Client: target host/port, sampling rate, tags, etc. + +Whenever you want to use different options (e.g. other tags, different sampling +rate), you should use the Clone() method of the Client. + +Because when cloning a Client, the same connection is reused so this is way +cheaper and more efficient than creating another Client using New(). + + +Internals + +Client's methods buffer metrics. The buffer is flushed when either: + - the background goroutine flushes the buffer (every 100ms by default) + - the buffer is full (1440 bytes by default so that IP packets are not + fragmented) + +The background goroutine can be disabled using the FlushPeriod(0) option. + +Buffering can be disabled using the MaxPacketSize(0) option. + +StatsD homepage: https://github.com/etsy/statsd +*/ +package statsd diff --git a/vendor/github.com/alexcesaro/statsd/options.go b/vendor/github.com/alexcesaro/statsd/options.go new file mode 100644 index 000000000..ef95bb8c3 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/options.go @@ -0,0 +1,250 @@ +package statsd + +import ( + "bytes" + "strings" + "time" +) + +type config struct { + Conn connConfig + Client clientConfig +} + +type clientConfig struct { + Muted bool + Rate float32 + Prefix string + Tags []tag +} + +type connConfig struct { + Addr string + ErrorHandler func(error) + FlushPeriod time.Duration + MaxPacketSize int + Network string + TagFormat TagFormat +} + +// An Option represents an option for a Client. It must be used as an +// argument to New() or Client.Clone(). +type Option func(*config) + +// Address sets the address of the StatsD daemon. +// +// By default, ":8125" is used. This option is ignored in Client.Clone(). +func Address(addr string) Option { + return Option(func(c *config) { + c.Conn.Addr = addr + }) +} + +// ErrorHandler sets the function called when an error happens when sending +// metrics (e.g. the StatsD daemon is not listening anymore). +// +// By default, these errors are ignored. This option is ignored in +// Client.Clone(). +func ErrorHandler(h func(error)) Option { + return Option(func(c *config) { + c.Conn.ErrorHandler = h + }) +} + +// FlushPeriod sets how often the Client's buffer is flushed. If p is 0, the +// goroutine that periodically flush the buffer is not lauched and the buffer +// is only flushed when it is full. +// +// By default, the flush period is 100 ms. This option is ignored in +// Client.Clone(). +func FlushPeriod(p time.Duration) Option { + return Option(func(c *config) { + c.Conn.FlushPeriod = p + }) +} + +// MaxPacketSize sets the maximum packet size in bytes sent by the Client. +// +// By default, it is 1440 to avoid IP fragmentation. This option is ignored in +// Client.Clone(). +func MaxPacketSize(n int) Option { + return Option(func(c *config) { + c.Conn.MaxPacketSize = n + }) +} + +// Network sets the network (udp, tcp, etc) used by the client. See the +// net.Dial documentation (https://golang.org/pkg/net/#Dial) for the available +// network options. +// +// By default, network is udp. This option is ignored in Client.Clone(). +func Network(network string) Option { + return Option(func(c *config) { + c.Conn.Network = network + }) +} + +// Mute sets whether the Client is muted. All methods of a muted Client do +// nothing and return immedialtly. +// +// This option can be used in Client.Clone() only if the parent Client is not +// muted. The clones of a muted Client are always muted. +func Mute(b bool) Option { + return Option(func(c *config) { + c.Client.Muted = b + }) +} + +// SampleRate sets the sample rate of the Client. It allows sending the metrics +// less often which can be useful for performance intensive code paths. +func SampleRate(rate float32) Option { + return Option(func(c *config) { + c.Client.Rate = rate + }) +} + +// Prefix appends the prefix that will be used in every bucket name. +// +// Note that when used in cloned, the prefix of the parent Client is not +// replaced but is prepended to the given prefix. +func Prefix(p string) Option { + return Option(func(c *config) { + c.Client.Prefix += strings.TrimSuffix(p, ".") + "." + }) +} + +// TagFormat represents the format of tags sent by a Client. +type TagFormat uint8 + +// TagsFormat sets the format of tags. +func TagsFormat(tf TagFormat) Option { + return Option(func(c *config) { + c.Conn.TagFormat = tf + }) +} + +// Tags appends the given tags to the tags sent with every metrics. If a tag +// already exists, it is replaced. +// +// The tags must be set as key-value pairs. If the number of tags is not even, +// Tags panics. +// +// If the format of tags have not been set using the TagsFormat option, the tags +// will be ignored. +func Tags(tags ...string) Option { + if len(tags)%2 != 0 { + panic("statsd: Tags only accepts an even number of arguments") + } + + return Option(func(c *config) { + if len(tags) == 0 { + return + } + + newTags := make([]tag, len(tags)/2) + for i := 0; i < len(tags)/2; i++ { + newTags[i] = tag{K: tags[2*i], V: tags[2*i+1]} + } + + for _, newTag := range newTags { + exists := false + for _, oldTag := range c.Client.Tags { + if newTag.K == oldTag.K { + exists = true + oldTag.V = newTag.V + } + } + if !exists { + c.Client.Tags = append(c.Client.Tags, tag{ + K: newTag.K, + V: newTag.V, + }) + } + } + }) +} + +type tag struct { + K, V string +} + +func joinTags(tf TagFormat, tags []tag) string { + if len(tags) == 0 || tf == 0 { + return "" + } + join := joinFuncs[tf] + return join(tags) +} + +func splitTags(tf TagFormat, tags string) []tag { + if len(tags) == 0 || tf == 0 { + return nil + } + split := splitFuncs[tf] + return split(tags) +} + +const ( + // InfluxDB tag format. + // See https://influxdb.com/blog/2015/11/03/getting_started_with_influx_statsd.html + InfluxDB TagFormat = iota + 1 + // Datadog tag format. + // See http://docs.datadoghq.com/guides/metrics/#tags + Datadog +) + +var ( + joinFuncs = map[TagFormat]func([]tag) string{ + // InfluxDB tag format: ,tag1=payroll,region=us-west + // https://influxdb.com/blog/2015/11/03/getting_started_with_influx_statsd.html + InfluxDB: func(tags []tag) string { + var buf bytes.Buffer + for _, tag := range tags { + _ = buf.WriteByte(',') + _, _ = buf.WriteString(tag.K) + _ = buf.WriteByte('=') + _, _ = buf.WriteString(tag.V) + } + return buf.String() + }, + // Datadog tag format: |#tag1:value1,tag2:value2 + // http://docs.datadoghq.com/guides/dogstatsd/#datagram-format + Datadog: func(tags []tag) string { + buf := bytes.NewBufferString("|#") + first := true + for _, tag := range tags { + if first { + first = false + } else { + _ = buf.WriteByte(',') + } + _, _ = buf.WriteString(tag.K) + _ = buf.WriteByte(':') + _, _ = buf.WriteString(tag.V) + } + return buf.String() + }, + } + splitFuncs = map[TagFormat]func(string) []tag{ + InfluxDB: func(s string) []tag { + s = s[1:] + pairs := strings.Split(s, ",") + tags := make([]tag, len(pairs)) + for i, pair := range pairs { + kv := strings.Split(pair, "=") + tags[i] = tag{K: kv[0], V: kv[1]} + } + return tags + }, + Datadog: func(s string) []tag { + s = s[2:] + pairs := strings.Split(s, ",") + tags := make([]tag, len(pairs)) + for i, pair := range pairs { + kv := strings.Split(pair, ":") + tags[i] = tag{K: kv[0], V: kv[1]} + } + return tags + }, + } +) diff --git a/vendor/github.com/alexcesaro/statsd/statsd.go b/vendor/github.com/alexcesaro/statsd/statsd.go new file mode 100644 index 000000000..f19204d79 --- /dev/null +++ b/vendor/github.com/alexcesaro/statsd/statsd.go @@ -0,0 +1,169 @@ +package statsd + +import "time" + +// A Client represents a StatsD client. +type Client struct { + conn *conn + muted bool + rate float32 + prefix string + tags string +} + +// New returns a new Client. +func New(opts ...Option) (*Client, error) { + // The default configuration. + conf := &config{ + Client: clientConfig{ + Rate: 1, + }, + Conn: connConfig{ + Addr: ":8125", + FlushPeriod: 100 * time.Millisecond, + // Worst-case scenario: + // Ethernet MTU - IPv6 Header - TCP Header = 1500 - 40 - 20 = 1440 + MaxPacketSize: 1440, + Network: "udp", + }, + } + for _, o := range opts { + o(conf) + } + + conn, err := newConn(conf.Conn, conf.Client.Muted) + c := &Client{ + conn: conn, + muted: conf.Client.Muted, + } + if err != nil { + c.muted = true + return c, err + } + c.rate = conf.Client.Rate + c.prefix = conf.Client.Prefix + c.tags = joinTags(conf.Conn.TagFormat, conf.Client.Tags) + return c, nil +} + +// Clone returns a clone of the Client. The cloned Client inherits its +// configuration from its parent. +// +// All cloned Clients share the same connection, so cloning a Client is a cheap +// operation. +func (c *Client) Clone(opts ...Option) *Client { + tf := c.conn.tagFormat + conf := &config{ + Client: clientConfig{ + Rate: c.rate, + Prefix: c.prefix, + Tags: splitTags(tf, c.tags), + }, + } + for _, o := range opts { + o(conf) + } + + clone := &Client{ + conn: c.conn, + muted: c.muted || conf.Client.Muted, + rate: conf.Client.Rate, + prefix: conf.Client.Prefix, + tags: joinTags(tf, conf.Client.Tags), + } + clone.conn = c.conn + return clone +} + +// Count adds n to bucket. +func (c *Client) Count(bucket string, n interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, n, "c", c.rate, c.tags) +} + +func (c *Client) skip() bool { + return c.muted || (c.rate != 1 && randFloat() > c.rate) +} + +// Increment increment the given bucket. It is equivalent to Count(bucket, 1). +func (c *Client) Increment(bucket string) { + c.Count(bucket, 1) +} + +// Gauge records an absolute value for the given bucket. +func (c *Client) Gauge(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.gauge(c.prefix, bucket, value, c.tags) +} + +// Timing sends a timing value to a bucket. +func (c *Client) Timing(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, value, "ms", c.rate, c.tags) +} + +// Histogram sends an histogram value to a bucket. +func (c *Client) Histogram(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.metric(c.prefix, bucket, value, "h", c.rate, c.tags) +} + +// A Timing is an helper object that eases sending timing values. +type Timing struct { + start time.Time + c *Client +} + +// NewTiming creates a new Timing. +func (c *Client) NewTiming() Timing { + return Timing{start: now(), c: c} +} + +// Send sends the time elapsed since the creation of the Timing. +func (t Timing) Send(bucket string) { + t.c.Timing(bucket, int(t.Duration()/time.Millisecond)) +} + +// Duration returns the time elapsed since the creation of the Timing. +func (t Timing) Duration() time.Duration { + return now().Sub(t.start) +} + +// Unique sends the given value to a set bucket. +func (c *Client) Unique(bucket string, value string) { + if c.skip() { + return + } + c.conn.unique(c.prefix, bucket, value, c.tags) +} + +// Flush flushes the Client's buffer. +func (c *Client) Flush() { + if c.muted { + return + } + c.conn.mu.Lock() + c.conn.flush(0) + c.conn.mu.Unlock() +} + +// Close flushes the Client's buffer and releases the associated ressources. The +// Client and all the cloned Clients must not be used afterward. +func (c *Client) Close() { + if c.muted { + return + } + c.conn.mu.Lock() + c.conn.flush(0) + c.conn.handleError(c.conn.w.Close()) + c.conn.closed = true + c.conn.mu.Unlock() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 0625b8844..eae907f7f 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -2,6 +2,7 @@ "comment": "", "ignore": "test", "package": [ + {"path":"github.com/alexcesaro/statsd","checksumSHA1":"U3+w41n+fYxSL3FsXo0kR7zuGho=","revision":"7fea3f0d2fab1ad973e641e51dba45443a311a90","revisionTime":"2016-03-20T18:21:10Z"}, {"path":"github.com/armon/go-proxyproto","checksumSHA1":"eAall4ACaMG40mzSJ5Oc95GiF1A=","revision":"609d6338d3a76ec26ac3fe7045a164d9a58436e7","revisionTime":"2015-02-06T18:58:55-08:00"}, {"path":"github.com/circonus-labs/circonus-gometrics","checksumSHA1":"ZAdLZ0e/hin4AXxdS9F8y0yi/bg=","revision":"f8c68ed96a065c10344dcaf802f608781fc0a981","revisionTime":"2016-08-30T16:47:25Z"}, {"path":"github.com/circonus-labs/circonus-gometrics/api","checksumSHA1":"2hV0W0wM75u+OJ4kGv0i7B95cmY=","revision":"f8c68ed96a065c10344dcaf802f608781fc0a981","revisionTime":"2016-08-30T16:47:25Z"},