From 7922c1542e9fbae83b24c0ab9649919ea62c4aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Tue, 26 Sep 2023 20:10:12 +0200 Subject: [PATCH] feature: routesrv add prometheus metrics feature: routesrv enable profiling feature: routesrv split supportListener for metrics and profiling from the main listener feature: routesrv shutdown supportServer gracefully test: routesrv shutdown main listener and supportListener test: add more routesrv coverage to reach close to 80% MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- routesrv/eskipbytes.go | 52 ++++++++++- routesrv/polling.go | 42 +++------ routesrv/redishandler.go | 4 +- routesrv/routesrv.go | 118 ++++++++++++++++++------ routesrv/routesrv_test.go | 15 +++ routesrv/shutdown_test.go | 187 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 361 insertions(+), 57 deletions(-) create mode 100644 routesrv/shutdown_test.go diff --git a/routesrv/eskipbytes.go b/routesrv/eskipbytes.go index 620b2415d7..37ebdc664a 100644 --- a/routesrv/eskipbytes.go +++ b/routesrv/eskipbytes.go @@ -11,10 +11,41 @@ import ( ot "github.com/opentracing/opentracing-go" "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/routing" "github.com/zalando/skipper/tracing" ) +type responseWriterInterceptor struct { + http.ResponseWriter + statusCode int + bytesWritten int +} + +func (w *responseWriterInterceptor) WriteHeader(statusCode int) { + w.statusCode = statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +func (w *responseWriterInterceptor) Header() http.Header { + return w.ResponseWriter.Header() +} + +func (w *responseWriterInterceptor) Write(p []byte) (int, error) { + w.bytesWritten += len(p) + return w.ResponseWriter.Write(p) +} + +// Unwrap will be used by ResponseController, so if they will use that +// to get the ResponseWrite for some reason they can do it. +func (w *responseWriterInterceptor) Unwrap() http.ResponseWriter { + return w.ResponseWriter +} + +var ( + _ http.ResponseWriter = &responseWriterInterceptor{} +) + // eskipBytes keeps eskip-formatted routes as a byte slice and // provides synchronized r/w access to them. Additionally it can // serve as an HTTP handler exposing its content. @@ -26,8 +57,9 @@ type eskipBytes struct { count int mu sync.RWMutex - tracer ot.Tracer - now func() time.Time + tracer ot.Tracer + metrics metrics.Metrics + now func() time.Time } // formatAndSet takes a slice of routes and stores them eskip-formatted @@ -54,9 +86,23 @@ func (e *eskipBytes) formatAndSet(routes []*eskip.Route) (_ int, _ string, initi return len(e.data), e.etag, initialized, updated } -func (e *eskipBytes) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) { span := tracing.CreateSpan("serve_routes", r.Context(), e.tracer) defer span.Finish() + start := time.Now() + defer e.metrics.MeasureBackend("routersv", start) + + w := &responseWriterInterceptor{ + ResponseWriter: rw, + statusCode: http.StatusOK, + } + + defer func() { + span.SetTag("status", w.statusCode) + span.SetTag("bytes", w.bytesWritten) + + e.metrics.IncCounter(strconv.Itoa(w.statusCode)) + }() if r.Method != "GET" && r.Method != "HEAD" { w.WriteHeader(http.StatusMethodNotAllowed) diff --git a/routesrv/polling.go b/routesrv/polling.go index b262e5a4dd..00895cf0ac 100644 --- a/routesrv/polling.go +++ b/routesrv/polling.go @@ -8,11 +8,10 @@ import ( "time" ot "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/routing" "github.com/zalando/skipper/tracing" ) @@ -26,24 +25,6 @@ const ( LogRoutesUpdated = "routes updated" ) -var ( - pollingStarted = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "polling_started_timestamp", - Help: "UNIX time when the routes polling has started", - }) - routesInitialized = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "routes_initialized_timestamp", - Help: "UNIX time when the first routes were received and stored", - }) - routesUpdated = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "routesrv", - Name: "routes_updated_timestamp", - Help: "UNIX time of the last routes update (initial load counts as well)", - }) -) - type poller struct { client routing.DataClient b *eskipBytes @@ -56,8 +37,9 @@ type poller struct { editRoute []*eskip.Editor cloneRoute []*eskip.Clone - // tracer - tracer ot.Tracer + // visibility + tracer ot.Tracer + metrics metrics.Metrics } func (p *poller) poll(wg *sync.WaitGroup) { @@ -66,21 +48,20 @@ func (p *poller) poll(wg *sync.WaitGroup) { log.WithField("timeout", p.timeout).Info(LogPollingStarted) ticker := time.NewTicker(p.timeout) defer ticker.Stop() - pollingStarted.SetToCurrentTime() + p.setGaugeToCurrentTime("polling_started_timestamp") var lastRoutesById map[string]string for { span := tracing.CreateSpan("poll_routes", context.TODO(), p.tracer) routes, err := p.client.LoadAll() - routes = p.process(routes) - routesCount := len(routes) switch { case err != nil: log.WithError(err).Error(LogRoutesFetchingFailed) + p.metrics.IncCounter("routes.fetch_errors") span.SetTag("error", true) span.LogKV( @@ -89,6 +70,7 @@ func (p *poller) poll(wg *sync.WaitGroup) { ) case routesCount == 0: log.Error(LogRoutesEmpty) + p.metrics.IncCounter("routes.empty") span.SetTag("error", true) span.LogKV( @@ -101,12 +83,14 @@ func (p *poller) poll(wg *sync.WaitGroup) { if initialized { logger.Info(LogRoutesInitialized) span.SetTag("routes.initialized", true) - routesInitialized.SetToCurrentTime() + p.setGaugeToCurrentTime("routes.initialized_timestamp") } if updated { logger.Info(LogRoutesUpdated) span.SetTag("routes.updated", true) - routesUpdated.SetToCurrentTime() + p.setGaugeToCurrentTime("routes.updated_timestamp") + p.metrics.UpdateGauge("routes.total", float64(routesCount)) + p.metrics.UpdateGauge("routes.byte", float64(routesBytes)) } span.SetTag("routes.count", routesCount) span.SetTag("routes.bytes", routesBytes) @@ -154,6 +138,10 @@ func (p *poller) process(routes []*eskip.Route) []*eskip.Route { return routes } +func (p *poller) setGaugeToCurrentTime(name string) { + p.metrics.UpdateGauge(name, (float64(time.Now().UnixNano()) / 1e9)) +} + func mapRoutes(routes []*eskip.Route) map[string]string { byId := make(map[string]string) for _, r := range routes { diff --git a/routesrv/redishandler.go b/routesrv/redishandler.go index 1a4fc10ea9..45ad66e715 100644 --- a/routesrv/redishandler.go +++ b/routesrv/redishandler.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/zalando/skipper/dataclients/kubernetes" + "github.com/zalando/skipper/metrics" ) type RedisHandler struct { @@ -37,10 +38,11 @@ func (rh *RedisHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write(address) } -func getRedisAddresses(namespace, name string, kdc *kubernetes.Client) func() ([]byte, error) { +func getRedisAddresses(namespace, name string, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) { return func() ([]byte, error) { a := kdc.GetEndpointAddresses(namespace, name) log.Debugf("Redis updater called and found %d redis endpoints: %v", len(a), a) + m.UpdateGauge("redis_endpoints", float64(len(a))) result := RedisEndpoints{} for i := 0; i < len(a); i++ { diff --git a/routesrv/routesrv.go b/routesrv/routesrv.go index d333411199..a4a6aeb1a7 100644 --- a/routesrv/routesrv.go +++ b/routesrv/routesrv.go @@ -9,21 +9,24 @@ import ( "syscall" "time" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/zalando/skipper" "github.com/zalando/skipper/dataclients/kubernetes" "github.com/zalando/skipper/filters/auth" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/tracing" ) // RouteServer is used to serve eskip-formatted routes, // that originate from the polled data source. type RouteServer struct { - server *http.Server - poller *poller - wg *sync.WaitGroup + metrics metrics.Metrics + server *http.Server + supportServer *http.Server + poller *poller + wg *sync.WaitGroup } // New returns an initialized route server according to the passed options. @@ -31,7 +34,27 @@ type RouteServer struct { // will stay in an uninitialized state, till StartUpdates is called and // in effect data source is queried and routes initialized/updated. func New(opts skipper.Options) (*RouteServer, error) { - rs := &RouteServer{} + if opts.PrometheusRegistry == nil { + opts.PrometheusRegistry = prometheus.NewRegistry() + } + + mopt := metrics.Options{ + Format: metrics.PrometheusKind, + Prefix: "routesrv", + PrometheusRegistry: opts.PrometheusRegistry, + EnableDebugGcMetrics: true, + EnableRuntimeMetrics: true, + EnableProfile: opts.EnableProfile, + BlockProfileRate: opts.BlockProfileRate, + MutexProfileFraction: opts.MutexProfileFraction, + MemProfileRate: opts.MemProfileRate, + } + m := metrics.NewMetrics(mopt) + metricsHandler := metrics.NewHandler(mopt, m) + + rs := &RouteServer{ + metrics: m, + } opentracingOpts := opts.OpenTracing if len(opentracingOpts) == 0 { @@ -42,12 +65,25 @@ func New(opts skipper.Options) (*RouteServer, error) { return nil, err } - b := &eskipBytes{tracer: tracer, now: time.Now} - bs := &eskipBytesStatus{b: b} - handler := http.NewServeMux() - handler.Handle("/health", bs) - handler.Handle("/routes", b) - handler.Handle("/metrics", promhttp.Handler()) + b := &eskipBytes{ + tracer: tracer, + metrics: m, + now: time.Now, + } + bs := &eskipBytesStatus{ + b: b, + } + mux := http.NewServeMux() + mux.Handle("/health", bs) + mux.Handle("/routes", b) + supportHandler := http.NewServeMux() + supportHandler.Handle("/metrics", metricsHandler) + supportHandler.Handle("/metrics/", metricsHandler) + + if opts.EnableProfile { + supportHandler.Handle("/debug/pprof", metricsHandler) + supportHandler.Handle("/debug/pprof/", metricsHandler) + } dataclient, err := kubernetes.New(opts.KubernetesDataClientOptions()) if err != nil { @@ -68,13 +104,20 @@ func New(opts skipper.Options) (*RouteServer, error) { if err != nil { return nil, err } - rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient) - handler.Handle("/swarm/redis/shards", rh) + rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient, m) + mux.Handle("/swarm/redis/shards", rh) } rs.server = &http.Server{ Addr: opts.Address, - Handler: handler, + Handler: mux, + ReadTimeout: 1 * time.Minute, + ReadHeaderTimeout: 1 * time.Minute, + } + + rs.supportServer = &http.Server{ + Addr: opts.SupportListener, + Handler: supportHandler, ReadTimeout: 1 * time.Minute, ReadHeaderTimeout: 1 * time.Minute, } @@ -89,6 +132,7 @@ func New(opts skipper.Options) (*RouteServer, error) { cloneRoute: opts.CloneRoute, oauth2Config: oauthConfig, tracer: tracer, + metrics: m, } rs.wg = &sync.WaitGroup{} @@ -114,6 +158,15 @@ func (rs *RouteServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { rs.server.Handler.ServeHTTP(w, r) } +func (rs *RouteServer) startSupportListener() { + if rs.supportServer != nil { + err := rs.supportServer.ListenAndServe() + if err != nil { + log.Errorf("Failed support listener: %v", err) + } + } +} + func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { once := sync.Once{} rs.wg.Add(1) @@ -125,6 +178,12 @@ func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { log.Infof("shutting down the server in %s...", delay) time.Sleep(delay) + if rs.supportServer != nil { + if err := rs.supportServer.Shutdown(context.Background()); err != nil { + log.Error("unable to shut down the support server: ", err) + } + log.Info("supportServer shut down") + } if err := rs.server.Shutdown(context.Background()); err != nil { log.Error("unable to shut down the server: ", err) } @@ -133,21 +192,11 @@ func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { } } -// Run starts a route server set up according to the passed options. -// It is a blocking call designed to be used as a single call/entry point, -// when running the route server as a standalone binary. It returns, when -// the server is closed, which can happen due to server startup errors or -// gracefully handled SIGTERM signal. In case of a server startup error, -// the error is returned as is. -func Run(opts skipper.Options) error { - rs, err := New(opts) - if err != nil { - return err - } +func run(rs *RouteServer, opts skipper.Options, sigs chan os.Signal) error { + var err error shutdown := newShutdownFunc(rs) - sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) go func() { <-sigs @@ -156,6 +205,7 @@ func Run(opts skipper.Options) error { rs.StartUpdates() + go rs.startSupportListener() if err = rs.server.ListenAndServe(); err != http.ErrServerClosed { go shutdown(0) } else { @@ -166,3 +216,19 @@ func Run(opts skipper.Options) error { return err } + +// Run starts a route server set up according to the passed options. +// It is a blocking call designed to be used as a single call/entry point, +// when running the route server as a standalone binary. It returns, when +// the server is closed, which can happen due to server startup errors or +// gracefully handled SIGTERM signal. In case of a server startup error, +// the error is returned as is. +func Run(opts skipper.Options) error { + rs, err := New(opts) + if err != nil { + return err + } + sigs := make(chan os.Signal, 1) + return run(rs, opts, sigs) + +} diff --git a/routesrv/routesrv_test.go b/routesrv/routesrv_test.go index c92bb3ccf9..e1ed960ff2 100644 --- a/routesrv/routesrv_test.go +++ b/routesrv/routesrv_test.go @@ -16,6 +16,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/zalando/skipper" "github.com/zalando/skipper/dataclients/kubernetes/kubernetestest" "github.com/zalando/skipper/eskip" @@ -109,6 +110,14 @@ func getRoutes(rs *routesrv.RouteServer) *httptest.ResponseRecorder { return w } +func getHealth(rs *routesrv.RouteServer) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + r := httptest.NewRequest("GET", "/health", nil) + rs.ServeHTTP(w, r) + + return w +} + func getRedisURLs(rs *routesrv.RouteServer) *httptest.ResponseRecorder { w := httptest.NewRecorder() r := httptest.NewRequest("GET", "/swarm/redis/shards", nil) @@ -171,6 +180,9 @@ func TestNotInitializedRoutesAreNotServed(t *testing.T) { t.Error("uninitialized routes were served") } wantHTTPCode(t, w, http.StatusNotFound) + + w = getHealth(rs) + wantHTTPCode(t, w, http.StatusServiceUnavailable) } func TestEmptyRoutesAreNotServed(t *testing.T) { @@ -218,6 +230,9 @@ func TestFetchedRoutesAreServedInEskipFormat(t *testing.T) { t.Errorf("served routes do not reflect kubernetes resources: %s", cmp.Diff(got, want)) } wantHTTPCode(t, w, http.StatusOK) + + w = getHealth(rs) + wantHTTPCode(t, w, http.StatusNoContent) } func TestRedisEndpointSlices(t *testing.T) { diff --git a/routesrv/shutdown_test.go b/routesrv/shutdown_test.go new file mode 100644 index 0000000000..4ce164be6c --- /dev/null +++ b/routesrv/shutdown_test.go @@ -0,0 +1,187 @@ +package routesrv + +import ( + "bytes" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/zalando/skipper" + "github.com/zalando/skipper/dataclients/kubernetes/kubernetestest" +) + +type muxHandler struct { + handler http.Handler + mu sync.RWMutex +} + +func (m *muxHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + m.mu.RLock() + defer m.mu.RUnlock() + + m.handler.ServeHTTP(w, r) +} + +func newKubeAPI(t *testing.T, specs ...io.Reader) http.Handler { + t.Helper() + api, err := kubernetestest.NewAPI(kubernetestest.TestAPIOptions{}, specs...) + if err != nil { + t.Fatalf("cannot initialize kubernetes api: %s", err) + } + return api +} +func newKubeServer(t *testing.T, specs ...io.Reader) (*httptest.Server, *muxHandler) { + t.Helper() + handler := &muxHandler{handler: newKubeAPI(t, specs...)} + return httptest.NewUnstartedServer(handler), handler +} + +func loadKubeYAML(t *testing.T, path string) io.Reader { + t.Helper() + y, err := os.ReadFile(path) + if err != nil { + t.Fatalf("failed to open kubernetes resources fixture %s: %v", path, err) + } + return bytes.NewBuffer(y) +} + +func findAddress() (string, error) { + l, err := net.ListenTCP("tcp6", &net.TCPAddr{}) + if err != nil { + return "", err + } + + defer l.Close() + return l.Addr().String(), nil +} + +func TestServerShutdownHTTP(t *testing.T) { + ks, _ := newKubeServer(t, loadKubeYAML(t, "testdata/lb-target-multi.yaml")) + defer ks.Close() + ks.Start() + + o := skipper.Options{ + KubernetesURL: "http://" + ks.Listener.Addr().String(), + SourcePollTimeout: 500 * time.Millisecond, + } + const shutdownDelay = 1 * time.Second + + address, err := findAddress() + if err != nil { + t.Fatalf("Failed to find address: %v", err) + } + supportAddress, err := findAddress() + if err != nil { + t.Fatalf("Failed to find supportAddress: %v", err) + } + + o.Address, o.SupportListener, o.WaitForHealthcheckInterval = address, supportAddress, shutdownDelay + baseURL := "http://" + address + supportBaseURL := "http://" + supportAddress + testEndpoints := []string{baseURL + "/routes", supportBaseURL + "/metrics"} + + t.Logf("kube endpoint: %q", o.KubernetesURL) + for _, u := range testEndpoints { + t.Logf("test endpoint: %q", u) + } + + rs, err := New(o) + if err != nil { + t.Fatalf("Failed to create a routesrv: %v", err) + } + + time.Sleep(o.SourcePollTimeout * 2) + + cli := http.Client{ + Timeout: time.Second, + } + rsp, err := cli.Get(o.KubernetesURL + "/api/v1/services") + if err != nil { + t.Fatalf("Failed to get %q: %v", o.KubernetesURL, err) + } + if rsp.StatusCode != 200 { + t.Fatalf("Failed to get status OK for %q: %d", o.KubernetesURL, rsp.StatusCode) + } + + sigs := make(chan os.Signal, 1) + + errCh := make(chan error) + go func() { + err := run(rs, o, sigs) + if err != nil { + errCh <- err + } + }() + + // make sure we started all listeners correctly + for i := 0; i < 5; i++ { + var ( + err error + rsp *http.Response + ) + + for _, u := range testEndpoints { + rsp, err = http.DefaultClient.Get(u) + if err != nil { + err = fmt.Errorf("failed to get %q: %v", u, err) + time.Sleep(10 * time.Millisecond) + continue + } + if rsp.StatusCode != 200 { + err = fmt.Errorf("failed to get expected status code 200 for %q, got: %d", u, rsp.StatusCode) + + time.Sleep(10 * time.Millisecond) + continue + } + err = nil + } + if i == 4 && err != nil { + t.Fatalf("Failed to get %v", err) + } + } + + // initiate shutdown + sigs <- syscall.SIGTERM + + // test that we can fetch even within termination + time.Sleep(shutdownDelay / 2) + + for _, u := range testEndpoints { + rsp, err := http.DefaultClient.Get(u) + if err != nil { + t.Fatalf("Failed to get %q after SIGTERM: %v", u, err) + } + if rsp.StatusCode != 200 { + t.Fatalf("Failed to get expected status code 200 for %q after SIGTERM, got: %d", u, rsp.StatusCode) + } + } + + // test that we get connection refused after shutdown + time.Sleep(shutdownDelay / 2) + + for _, u := range testEndpoints { + _, err = http.DefaultClient.Get(u) + switch err { + case nil: + t.Fatalf("Failed to get error as expected: %q", u) + default: + if e := err.Error(); !strings.Contains(e, "refused") { + t.Fatalf("Failed to get connection refused, got: %s", e) + } + } + } + + select { + case err := <-errCh: + t.Fatalf("Failed to shutdown: %v", err) + default: + } +}