Skip to content

Commit

Permalink
feature: routesrv add prometheus metrics
Browse files Browse the repository at this point in the history
feature: routesrv enable profiling
feature: routesrv split supportListener for metrics and profiling from the main listener
feature: routesrv shutdown supportServer gracefully
test: routesrv shutdown main listener and supportListener
test: add more routesrv coverage to reach close to 80%

Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs committed Oct 31, 2023
1 parent 4ae47cd commit 7922c15
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 57 deletions.
52 changes: 49 additions & 3 deletions routesrv/eskipbytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,41 @@ import (

ot "github.com/opentracing/opentracing-go"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/metrics"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/tracing"
)

type responseWriterInterceptor struct {
http.ResponseWriter
statusCode int
bytesWritten int
}

func (w *responseWriterInterceptor) WriteHeader(statusCode int) {
w.statusCode = statusCode
w.ResponseWriter.WriteHeader(statusCode)
}

func (w *responseWriterInterceptor) Header() http.Header {
return w.ResponseWriter.Header()
}

func (w *responseWriterInterceptor) Write(p []byte) (int, error) {
w.bytesWritten += len(p)
return w.ResponseWriter.Write(p)
}

// Unwrap will be used by ResponseController, so if they will use that
// to get the ResponseWrite for some reason they can do it.
func (w *responseWriterInterceptor) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}

var (
_ http.ResponseWriter = &responseWriterInterceptor{}
)

// eskipBytes keeps eskip-formatted routes as a byte slice and
// provides synchronized r/w access to them. Additionally it can
// serve as an HTTP handler exposing its content.
Expand All @@ -26,8 +57,9 @@ type eskipBytes struct {
count int
mu sync.RWMutex

tracer ot.Tracer
now func() time.Time
tracer ot.Tracer
metrics metrics.Metrics
now func() time.Time
}

// formatAndSet takes a slice of routes and stores them eskip-formatted
Expand All @@ -54,9 +86,23 @@ func (e *eskipBytes) formatAndSet(routes []*eskip.Route) (_ int, _ string, initi
return len(e.data), e.etag, initialized, updated
}

func (e *eskipBytes) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
span := tracing.CreateSpan("serve_routes", r.Context(), e.tracer)
defer span.Finish()
start := time.Now()
defer e.metrics.MeasureBackend("routersv", start)

w := &responseWriterInterceptor{
ResponseWriter: rw,
statusCode: http.StatusOK,
}

defer func() {
span.SetTag("status", w.statusCode)
span.SetTag("bytes", w.bytesWritten)

e.metrics.IncCounter(strconv.Itoa(w.statusCode))
}()

if r.Method != "GET" && r.Method != "HEAD" {
w.WriteHeader(http.StatusMethodNotAllowed)
Expand Down
42 changes: 15 additions & 27 deletions routesrv/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"time"

ot "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters/auth"
"github.com/zalando/skipper/metrics"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/tracing"
)
Expand All @@ -26,24 +25,6 @@ const (
LogRoutesUpdated = "routes updated"
)

var (
pollingStarted = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "routesrv",
Name: "polling_started_timestamp",
Help: "UNIX time when the routes polling has started",
})
routesInitialized = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "routesrv",
Name: "routes_initialized_timestamp",
Help: "UNIX time when the first routes were received and stored",
})
routesUpdated = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "routesrv",
Name: "routes_updated_timestamp",
Help: "UNIX time of the last routes update (initial load counts as well)",
})
)

type poller struct {
client routing.DataClient
b *eskipBytes
Expand All @@ -56,8 +37,9 @@ type poller struct {
editRoute []*eskip.Editor
cloneRoute []*eskip.Clone

// tracer
tracer ot.Tracer
// visibility
tracer ot.Tracer
metrics metrics.Metrics
}

func (p *poller) poll(wg *sync.WaitGroup) {
Expand All @@ -66,21 +48,20 @@ func (p *poller) poll(wg *sync.WaitGroup) {
log.WithField("timeout", p.timeout).Info(LogPollingStarted)
ticker := time.NewTicker(p.timeout)
defer ticker.Stop()
pollingStarted.SetToCurrentTime()
p.setGaugeToCurrentTime("polling_started_timestamp")

var lastRoutesById map[string]string
for {
span := tracing.CreateSpan("poll_routes", context.TODO(), p.tracer)

routes, err := p.client.LoadAll()

routes = p.process(routes)

routesCount := len(routes)

switch {
case err != nil:
log.WithError(err).Error(LogRoutesFetchingFailed)
p.metrics.IncCounter("routes.fetch_errors")

span.SetTag("error", true)
span.LogKV(
Expand All @@ -89,6 +70,7 @@ func (p *poller) poll(wg *sync.WaitGroup) {
)
case routesCount == 0:
log.Error(LogRoutesEmpty)
p.metrics.IncCounter("routes.empty")

span.SetTag("error", true)
span.LogKV(
Expand All @@ -101,12 +83,14 @@ func (p *poller) poll(wg *sync.WaitGroup) {
if initialized {
logger.Info(LogRoutesInitialized)
span.SetTag("routes.initialized", true)
routesInitialized.SetToCurrentTime()
p.setGaugeToCurrentTime("routes.initialized_timestamp")
}
if updated {
logger.Info(LogRoutesUpdated)
span.SetTag("routes.updated", true)
routesUpdated.SetToCurrentTime()
p.setGaugeToCurrentTime("routes.updated_timestamp")
p.metrics.UpdateGauge("routes.total", float64(routesCount))
p.metrics.UpdateGauge("routes.byte", float64(routesBytes))
}
span.SetTag("routes.count", routesCount)
span.SetTag("routes.bytes", routesBytes)
Expand Down Expand Up @@ -154,6 +138,10 @@ func (p *poller) process(routes []*eskip.Route) []*eskip.Route {
return routes
}

func (p *poller) setGaugeToCurrentTime(name string) {
p.metrics.UpdateGauge(name, (float64(time.Now().UnixNano()) / 1e9))
}

func mapRoutes(routes []*eskip.Route) map[string]string {
byId := make(map[string]string)
for _, r := range routes {
Expand Down
4 changes: 3 additions & 1 deletion routesrv/redishandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/dataclients/kubernetes"
"github.com/zalando/skipper/metrics"
)

type RedisHandler struct {
Expand Down Expand Up @@ -37,10 +38,11 @@ func (rh *RedisHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write(address)
}

func getRedisAddresses(namespace, name string, kdc *kubernetes.Client) func() ([]byte, error) {
func getRedisAddresses(namespace, name string, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) {
return func() ([]byte, error) {
a := kdc.GetEndpointAddresses(namespace, name)
log.Debugf("Redis updater called and found %d redis endpoints: %v", len(a), a)
m.UpdateGauge("redis_endpoints", float64(len(a)))

result := RedisEndpoints{}
for i := 0; i < len(a); i++ {
Expand Down
Loading

0 comments on commit 7922c15

Please sign in to comment.