Skip to content

Commit

Permalink
Issue #306: Add TCP metrics
Browse files Browse the repository at this point in the history
This patch adds metrics for the TCP and TCP+SNI proxy.

Fixes #306
  • Loading branch information
magiconair committed Jun 29, 2017
1 parent bbb0ca3 commit 05b7d51
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 35 deletions.
25 changes: 17 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,14 @@ func newHTTPProxy(cfg *config.Config) http.Handler {
}
}

func lookupHostFn(cfg *config.Config) func(string) string {
func lookupHostFn(cfg *config.Config) func(string) *route.Target {
pick := route.Picker[cfg.Proxy.Strategy]
notFound := metrics.DefaultRegistry.GetCounter("notfound")
return func(host string) string {
return func(host string) *route.Target {
t := route.GetTable().LookupHost(host, pick)
if t == nil {
notFound.Inc(1)
log.Print("[WARN] No route for ", host)
return ""
}
return t.URL.Host
return t
}
}

Expand Down Expand Up @@ -252,14 +249,26 @@ func startServers(cfg *config.Config) {
}()
case "tcp":
go func() {
h := &tcp.Proxy{cfg.Proxy.DialTimeout, lookupHostFn(cfg)}
h := &tcp.Proxy{
DialTimeout: cfg.Proxy.DialTimeout,
Lookup: lookupHostFn(cfg),
Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"),
ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"),
Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"),
}
if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil {
exit.Fatal("[FATAL] ", err)
}
}()
case "tcp+sni":
go func() {
h := &tcp.SNIProxy{cfg.Proxy.DialTimeout, lookupHostFn(cfg)}
h := &tcp.SNIProxy{
DialTimeout: cfg.Proxy.DialTimeout,
Lookup: lookupHostFn(cfg),
Conn: metrics.DefaultRegistry.GetCounter("tcp_sni.conn"),
ConnFail: metrics.DefaultRegistry.GetCounter("tcp_sni.connfail"),
Noroute: metrics.DefaultRegistry.GetCounter("tcp_sni.noroute"),
}
if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil {
exit.Fatal("[FATAL] ", err)
}
Expand Down
39 changes: 39 additions & 0 deletions proxy/tcp/copy_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package tcp

import (
"io"

"github.com/fabiolb/fabio/metrics"
)

// copyBuffer is an adapted version of io.copyBuffer which updates a
// counter instead of returning the total bytes written.
func copyBuffer(dst io.Writer, src io.Reader, c metrics.Counter) (err error) {
buf := make([]byte, 32*1024)
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
if c != nil {
c.Inc(int64(nw))
}
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
return err
}
60 changes: 51 additions & 9 deletions proxy/tcp/sni_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"log"
"net"
"time"

"github.com/fabiolb/fabio/metrics"
"github.com/fabiolb/fabio/route"
)

// SNIProxy implements an SNI aware transparent TCP proxy which captures the
Expand All @@ -19,58 +22,97 @@ type SNIProxy struct {

// Lookup returns a target host for the given server name.
// The proxy will panic if this value is nil.
Lookup func(host string) string
Lookup func(host string) *route.Target

// Conn counts the number of connections.
Conn metrics.Counter

// ConnFail counts the failed upstream connection attempts.
ConnFail metrics.Counter

// Noroute counts the failed Lookup() calls.
Noroute metrics.Counter
}

func (p *SNIProxy) ServeTCP(in net.Conn) error {
defer in.Close()

if p.Conn != nil {
p.Conn.Inc(1)
}

// capture client hello
data := make([]byte, 1024)
n, err := in.Read(data)
if err != nil {
if p.ConnFail != nil {
p.ConnFail.Inc(1)
}
return err
}
data = data[:n]

host, ok := readServerName(data)
if !ok {
log.Print("[DEBUG] tcp+sni: TLS handshake failed")
if p.ConnFail != nil {
p.ConnFail.Inc(1)
}
return nil
}

if host == "" {
log.Print("[DEBUG] tcp+sni: server_name missing")
if p.ConnFail != nil {
p.ConnFail.Inc(1)
}
return nil
}

addr := p.Lookup(host)
if addr == "" {
t := p.Lookup(host)
if t == nil {
if p.Noroute != nil {
p.Noroute.Inc(1)
}
return nil
}
addr := t.URL.Host

out, err := net.DialTimeout("tcp", addr, p.DialTimeout)
if err != nil {
log.Print("[WARN] tcp+sni: cannot connect to upstream ", addr)
if p.ConnFail != nil {
p.ConnFail.Inc(1)
}
return err
}
defer out.Close()

// copy client hello
_, err = out.Write(data)
n, err = out.Write(data)
if err != nil {
log.Print("[WARN] tcp+sni: copy client hello failed. ", err)
if p.ConnFail != nil {
p.ConnFail.Inc(1)
}
return err
}

errc := make(chan error, 2)
cp := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
errc <- err
cp := func(dst io.Writer, src io.Reader, c metrics.Counter) {
errc <- copyBuffer(dst, src, c)
}

go cp(out, in)
go cp(in, out)
// rx measures the traffic to the upstream server (in <- out)
// tx measures the traffic from the upstream server (out <- in)
rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx")
tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx")

// we've received the ClientHello already
rx.Inc(int64(n))

go cp(in, out, rx)
go cp(out, in, tx)
err = <-errc
if err != nil && err != io.EOF {
log.Print("[WARN]: tcp+sni: ", err)
Expand Down
45 changes: 36 additions & 9 deletions proxy/tcp/tcp_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"log"
"net"
"time"

"github.com/fabiolb/fabio/metrics"
"github.com/fabiolb/fabio/route"
)

// Proxy implements a generic TCP proxying handler.
Expand All @@ -13,36 +16,60 @@ type Proxy struct {
// connection.
DialTimeout time.Duration

// Lookup returns a target host for the given server name.
// Lookup returns a target host for the given request.
// The proxy will panic if this value is nil.
Lookup func(host string) string
Lookup func(host string) *route.Target

// Conn counts the number of connections.
Conn metrics.Counter

// ConnFail counts the failed upstream connection attempts.
ConnFail metrics.Counter

// Noroute counts the failed Lookup() calls.
Noroute metrics.Counter
}

func (p *Proxy) ServeTCP(in net.Conn) error {
defer in.Close()

if p.Conn != nil {
p.Conn.Inc(1)
}

_, port, _ := net.SplitHostPort(in.LocalAddr().String())
port = ":" + port
addr := p.Lookup(port)
if addr == "" {
t := p.Lookup(port)
if t == nil {
if p.Noroute != nil {
p.Noroute.Inc(1)
}
return nil
}
addr := t.URL.Host

out, err := net.DialTimeout("tcp", addr, p.DialTimeout)
if err != nil {
log.Print("[WARN] tcp: cannot connect to upstream ", addr)
if p.ConnFail != nil {
p.ConnFail.Inc(1)
}
return err
}
defer out.Close()

errc := make(chan error, 2)
cp := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
errc <- err
cp := func(dst io.Writer, src io.Reader, c metrics.Counter) {
errc <- copyBuffer(dst, src, c)
}

go cp(out, in)
go cp(in, out)
// rx measures the traffic to the upstream server (in <- out)
// tx measures the traffic from the upstream server (out <- in)
rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx")
tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx")

go cp(in, out, rx)
go cp(out, in, tx)
err = <-errc
if err != nil && err != io.EOF {
log.Print("[WARN]: tcp: ", err)
Expand Down
14 changes: 9 additions & 5 deletions proxy/tcp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/x509"
"io/ioutil"
"net"
"net/url"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -41,10 +42,9 @@ func TestTCPProxy(t *testing.T) {
proxyAddr := "127.0.0.1:57778"
go func() {
h := &tcp.Proxy{
Lookup: func(h string) string {
Lookup: func(h string) *route.Target {
tbl, _ := route.NewTable("route add srv :57778 tcp://" + srv.Addr)
t := tbl.LookupHost(h, route.Picker["rr"])
return t.URL.Host
return tbl.LookupHost(h, route.Picker["rr"])
},
}
l := config.Listen{Addr: proxyAddr}
Expand Down Expand Up @@ -101,7 +101,9 @@ func TestTCPProxyWithTLS(t *testing.T) {
}

h := &tcp.Proxy{
Lookup: func(string) string { return srv.Addr },
Lookup: func(string) *route.Target {
return &route.Target{URL: &url.URL{Host: srv.Addr}}
},
}

l := config.Listen{Addr: proxyAddr}
Expand Down Expand Up @@ -148,7 +150,9 @@ func TestTCPSNIProxy(t *testing.T) {
proxyAddr := "127.0.0.1:57778"
go func() {
h := &tcp.SNIProxy{
Lookup: func(string) string { return srv.Addr },
Lookup: func(string) *route.Target {
return &route.Target{URL: &url.URL{Host: srv.Addr}}
},
}
l := config.Listen{Addr: proxyAddr}
if err := ListenAndServeTCP(l, h, nil); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (r *Route) addTarget(service string, targetURL *url.URL, fixedWeight float6
URL: targetURL,
FixedWeight: fixedWeight,
Timer: ServiceRegistry.GetTimer(name),
timerName: name,
TimerName: name,
}
if r.Opts != nil {
t.StripPath = r.Opts["strip"]
Expand Down
2 changes: 1 addition & 1 deletion route/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func syncRegistry(t Table) {
for _, routes := range t {
for _, r := range routes {
for _, tg := range r.Targets {
timers[tg.timerName] = true
timers[tg.TimerName] = true
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions route/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ type Target struct {
// Timer measures throughput and latency of this target
Timer metrics.Timer

// timerName is the name of the timer in the metrics registry
timerName string
// TimerName is the name of the timer in the metrics registry
TimerName string
}

0 comments on commit 05b7d51

Please sign in to comment.