Skip to content

Commit

Permalink
Abstract metrics to generic instrumentation interface
Browse files Browse the repository at this point in the history
  • Loading branch information
adamwasila committed Oct 16, 2019
1 parent 591861d commit 4b8012b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 36 deletions.
60 changes: 30 additions & 30 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,44 @@ import (
)

type metrics struct {
enabled bool
activeConnections int64
activeConnectionsMetric metric.Metric
}

func (m *metrics) activeConnectionAdd() {
if m.enabled {
val := atomic.AddInt64(&m.activeConnections, 1)
m.activeConnectionsMetric.Add(float64(val))
}
func (m *metrics) ConnectionOpened(id string) {
val := atomic.AddInt64(&m.activeConnections, 1)
m.activeConnectionsMetric.Add(float64(val))
}

func (m *metrics) activeConnectionRemove() {
if m.enabled {
val := atomic.AddInt64(&m.activeConnections, -1)
m.activeConnectionsMetric.Add(float64(val))
}
func (m *metrics) ConnectionClosedUpstream(id string) {
}

func (m *metrics) ConnectionClosedDownstream(id string) {

}

func (m *metrics) ConnectionClosed(id string) {
val := atomic.AddInt64(&m.activeConnections, -1)
m.activeConnectionsMetric.Add(float64(val))
}

func (m *metrics) init(adminPort int) {
if m.enabled {
go func() {
http.Handle("/", MainPageHandler())
http.Handle("/debug/metrics", metric.Handler(metric.Exposed))
logrus.WithField("port", adminPort).Infof("Start admin console")
server := &http.Server{Addr: fmt.Sprintf(":%d", adminPort), Handler: http.DefaultServeMux}
registerShutdownHook(func() {
err := server.Shutdown(context.Background())
if err != nil {
logrus.Errorf("Shutdown of admin console unclean: [%s]", err)
}
})
err := server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logrus.Infof("HTTP handler closed with error: %s", err)
go func() {
http.Handle("/", MainPageHandler())
http.Handle("/debug/metrics", metric.Handler(metric.Exposed))
logrus.WithField("port", adminPort).Infof("Start admin console")
server := &http.Server{Addr: fmt.Sprintf(":%d", adminPort), Handler: http.DefaultServeMux}
registerShutdownHook(func() {
err := server.Shutdown(context.Background())
if err != nil {
logrus.Errorf("Shutdown of admin console unclean: [%s]", err)
}
}()
m.activeConnectionsMetric = metric.NewGauge("15m1m")
expvar.Publish("connections:active", m.activeConnectionsMetric)
}
})
err := server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logrus.Infof("HTTP handler closed with error: %s", err)
}
}()
m.activeConnectionsMetric = metric.NewGauge("15m1m")
expvar.Publish("connections:active", m.activeConnectionsMetric)
}
33 changes: 27 additions & 6 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ import (
"github.com/spf13/pflag"
)

type instrumentation interface {
ConnectionOpened(id string)
ConnectionClosedUpstream(id string)
ConnectionClosedDownstream(id string)
ConnectionClosed(id string)
}

type nopInstrumentation struct{}

func (*nopInstrumentation) ConnectionOpened(id string) {}
func (*nopInstrumentation) ConnectionClosedUpstream(id string) {}
func (*nopInstrumentation) ConnectionClosedDownstream(id string) {}
func (*nopInstrumentation) ConnectionClosed(id string) {}

func main() {
setupGracefulStop()

Expand All @@ -28,13 +42,13 @@ func main() {
var closeChance, throttleChance float64
var verbose bool
var veryVerbose bool
var m metrics
var metricsEnabled bool

pflag.StringVarP(&bind, "bind", "b", "127.0.0.1:9998", "Address to bind listening socket to")
pflag.StringVarP(&upstream, "upstream", "u", "127.0.0.1:8000", "<host>[:port] of upstream service")
pflag.IntVarP(&rate, "rate", "r", -1, "Maximum data rate of bytes per second if throttling applied (see --throttle-chance)")
pflag.DurationVarP(&delay, "delay", "d", 0, "Initial delay when connection starts to deteriorate")
pflag.BoolVarP(&m.enabled, "admin", "a", false, "Enable admin console service")
pflag.BoolVarP(&metricsEnabled, "admin", "a", false, "Enable admin console service")
pflag.IntVarP(&adminPort, "admin-port", "p", 6000, "Port for admin console service")
pflag.BoolVarP(&verbose, "verbose", "v", false, "Enable verbose output (debug logs)")
pflag.BoolVarP(&veryVerbose, "very-verbose", "w", false, "Enable very verbose output (trace logs)")
Expand All @@ -50,6 +64,13 @@ func main() {

pflag.Parse()

var i instrumentation = &nopInstrumentation{}
if metricsEnabled {
m := metrics{}
m.init(adminPort)
i = &m
}

throttleChance += closeChance

if throttleChance < 0.0 || closeChance < 0.0 {
Expand Down Expand Up @@ -78,8 +99,6 @@ func main() {
"admin-port": adminPort,
}).Debugf("Config found")

m.init(adminPort)

addr, err := net.ResolveTCPAddr("tcp", bind)
if err != nil {
logrus.Errorln("Could not resolve", err)
Expand Down Expand Up @@ -144,7 +163,7 @@ func main() {
}
log.Debugf("New connection")

m.activeConnectionAdd()
i.ConnectionOpened(name)

once := sync.Once{}
connectionCloser := func() {
Expand All @@ -159,7 +178,7 @@ func main() {
if err != nil {
log.WithError(err).Warnf("Error while closing connection")
}
m.activeConnectionRemove()
i.ConnectionClosed(name)
})
}
hookID := registerShutdownHook(connectionCloser)
Expand All @@ -182,13 +201,15 @@ func main() {
handleConnection(log, throttle, close, rate, delay, conn, ups)
if rate != 0 {
closeSingleSide(log, conn, ups)
i.ConnectionClosedUpstream(name)
}
},
func() {
log = log.WithField("direction", "downstream")
handleConnection(log, throttle, close, rate, delay, ups, conn)
if rate != 0 {
closeSingleSide(log, ups, conn)
i.ConnectionClosedDownstream(name)
}
},
),
Expand Down

0 comments on commit 4b8012b

Please sign in to comment.