diff --git a/CHANGELOG.md b/CHANGELOG.md index d325eabc0e8..e0e1f4ccc4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,13 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Unreleased +### Added +- '/-/healthy' and `/-/ready` endpoint to all node types. + ## [v0.2.0](https://github.com/improbable-eng/thanos/releases/tag/v0.2.0) - 2018.12.10 Next Thanos release adding support to new discovery method, gRPC mTLS and two new object store providers (Swift and Azure). - + Note lots of necessary breaking changes in flags that relates to bucket configuration. ### Deprecated @@ -39,7 +42,7 @@ Note lots of necessary breaking changes in flags that relates to bucket configur ### Changed - *breaking*: Added `thanos_` prefix to memberlist (gossip) metrics. Make sure to update your dashboards and rules. -- S3 provider: +- S3 provider: - Set `"X-Amz-Acl": "bucket-owner-full-control"` metadata for s3 upload operation. ### Added @@ -53,7 +56,7 @@ Note lots of necessary breaking changes in flags that relates to bucket configur - In `thanos query`, file based discovery of store nodes using `--store.file-sd-config.files` - `/-/healthy` endpoint to Querier. - DNS service discovery to static and file based configurations using the `dns+` and `dnssrv+` prefixes for the respective lookup. Details [here](/docs/thanos_service_discovery.md) -- `--cluster.disable` flag to disable gossip functionality completely. +- `--cluster.disable` flag to disable gossip functionality completely. - Hidden flag to configure max compaction level. - Azure Storage. - OpenStack Swift support. diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 5a483efe3a2..8dec020575c 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -274,7 +274,11 @@ func runCompact( cancel() }) - if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil { + readinessFunc := func() (bool, error) { + return true, nil + } + + if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr, component, readinessFunc); err != nil { return err } diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index b0bb1f75d36..fc5d5f6b764 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -212,6 +212,38 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{})) } +func registerHealthy(mux *http.ServeMux, logger log.Logger, component string) { + mux.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + if _, err := fmt.Fprintf(w, "Thanos %v is Healthy.\n", component); err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + level.Error(logger).Log("msg", "Could not write health check response.") + } + }) +} + +func registerReady(mux *http.ServeMux, logger log.Logger, component string, readinessFunc func() (bool, error)) { + mux.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) { + ready, err := readinessFunc() + if err != nil { + level.Error(logger).Log("msg", "Failed to get readiness status.") + ready = false + } + + if ready { + w.WriteHeader(http.StatusOK) + if _, err := fmt.Fprintf(w, "Thanos %v is ready.\n", component); err != nil { + level.Error(logger).Log("msg", "Could not write readiness check response.") + } + return + } + w.WriteHeader(http.StatusServiceUnavailable) + if _, err := fmt.Fprintf(w, "Thanos %v is not ready.\n", component); err != nil { + level.Error(logger).Log("msg", "Could not write readiness check response.") + } + }) +} + // defaultGRPCServerOpts returns default gRPC server opts that includes: // - request histogram // - tracing @@ -294,10 +326,12 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o } // metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics. -func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error { +func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, component string, readinessFunc func() (bool, error)) error { mux := http.NewServeMux() registerMetrics(mux, reg) registerProfile(mux) + registerHealthy(mux, logger, component) + registerReady(mux, logger, component, readinessFunc) l, err := net.Listen("tcp", httpBindAddr) if err != nil { diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go new file mode 100644 index 00000000000..f66f920b27a --- /dev/null +++ b/cmd/thanos/main_test.go @@ -0,0 +1,72 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "os" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" +) + +func queryHTTPGetEndpoint(ctx context.Context, t *testing.T, logger log.Logger, url string) (*http.Response, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s", url), nil) + testutil.Ok(t, err) + return http.DefaultClient.Do(req.WithContext(ctx)) +} + +func isHealthy() (bool, error) { + return true, nil +} + +func TestGenericHttpEndpoints(t *testing.T) { + var g run.Group + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) + metricsRegistry := prometheus.NewRegistry() + component := "sidecar" + ctx := context.Background() + + freePort, err := testutil.FreePort() + testutil.Ok(t, err) + + serverAddress := fmt.Sprintf("127.0.0.1:%d", freePort) + + err = metricHTTPListenGroup(&g, logger, metricsRegistry, serverAddress, component, isHealthy) + testutil.Ok(t, err) + go func() { _ = g.Run() }() + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/healthy")) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/ready")) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/metrics")) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/debug/pprof/")) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) +} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 1a74f84e83f..6eefe7c6497 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -245,6 +245,8 @@ func runQuery( fileSD *file.Discovery, dnsSDInterval time.Duration, ) error { + var querierIsReady = false + duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_query_duplicated_store_address", Help: "The number of times a duplicated store addresses is detected from the different configs in query", @@ -386,11 +388,25 @@ func runQuery( router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil { + if _, err := fmt.Fprintf(w, "Thanos Querier is healthy.\n"); err != nil { level.Error(logger).Log("msg", "Could not write health check response.") } }) + router.Get("/-/ready", func(w http.ResponseWriter, r *http.Request) { + if querierIsReady { + w.WriteHeader(http.StatusOK) + if _, err := fmt.Fprintf(w, "Thanos Querier is ready.\n"); err != nil { + level.Error(logger).Log("msg", "Could not write readiness check response.") + } + return + } + w.WriteHeader(http.StatusServiceUnavailable) + if _, err := fmt.Fprintf(w, "Thanos Querier is not ready.\n"); err != nil { + level.Error(logger).Log("msg", "Could not write readiness check response.") + } + }) + mux := http.NewServeMux() registerMetrics(mux, reg) registerProfile(mux) @@ -405,6 +421,7 @@ func runQuery( level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr) return errors.Wrap(http.Serve(l, mux), "serve query") }, func(error) { + querierIsReady = false runutil.CloseWithLogOnErr(logger, l, "query and metric listener") }) } @@ -426,9 +443,11 @@ func runQuery( g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) + querierIsReady = true return errors.Wrap(s.Serve(l), "serve gRPC") }, func(error) { s.Stop() + querierIsReady = false runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") }) } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index b6d21f34815..7c084a0d532 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -325,8 +325,10 @@ func runRule( cancel() }) } + var ruleIsReady = false { var storeLset []storepb.Label + for _, l := range lset { storeLset = append(storeLset, storepb.Label{Name: l.Name, Value: l.Value}) } @@ -504,30 +506,7 @@ func runRule( cancel() }) } - // Start gRPC server. - { - l, err := net.Listen("tcp", grpcBindAddr) - if err != nil { - return errors.Wrap(err, "listen API address") - } - logger := log.With(logger, "component", "store") - store := store.NewTSDBStore(logger, reg, db, lset) - - opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) - if err != nil { - return errors.Wrap(err, "setup gRPC options") - } - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, store) - - g.Add(func() error { - return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { - s.Stop() - runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") - }) - } // Start UI & metrics HTTP server. { router := route.New() @@ -535,6 +514,28 @@ func runRule( reload <- struct{}{} }) + router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + if _, err = fmt.Fprintf(w, "Thanos Rule is Healthy.\n"); err != nil { + level.Error(logger).Log("msg", "Could not write health check response.") + } + }) + + router.Get("/-/ready", func(w http.ResponseWriter, r *http.Request) { + if ruleIsReady { + w.WriteHeader(http.StatusOK) + if _, err := fmt.Fprintf(w, "Thanos rule is ready.\n"); err != nil { + level.Error(logger).Log("msg", "Could not write readiness check response.") + } + return + } + w.WriteHeader(http.StatusServiceUnavailable) + if _, err := fmt.Fprintf(w, "Thanos rule is not ready.\n"); err != nil { + level.Error(logger).Log("msg", "Could not write readiness check response.") + } + + }) + ui.NewRuleUI(logger, mgr, alertQueryURL.String()).Register(router) mux := http.NewServeMux() @@ -551,10 +552,38 @@ func runRule( level.Info(logger).Log("msg", "Listening for ui requests", "address", httpBindAddr) return errors.Wrap(http.Serve(l, mux), "serve query") }, func(error) { + ruleIsReady = false runutil.CloseWithLogOnErr(logger, l, "query and metric listener") }) } + // Start gRPC server. + { + l, err := net.Listen("tcp", grpcBindAddr) + if err != nil { + return errors.Wrap(err, "listen API address") + } + logger := log.With(logger, "component", "store") + + store := store.NewTSDBStore(logger, reg, db, lset) + + opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC options") + } + s := grpc.NewServer(opts...) + storepb.RegisterStoreServer(s, store) + + g.Add(func() error { + ruleIsReady = true + return errors.Wrap(s.Serve(l), "serve gRPC") + }, func(error) { + s.Stop() + ruleIsReady = false + runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") + }) + } + var uploads = true bucketConfig, err := objStoreConfig.Content() diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index da1f82d1f40..9258da459f3 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -102,13 +102,24 @@ func runSidecar( reloader *reloader.Reloader, component string, ) error { - var metadata = &metadata{ - promURL: promURL, + var ( + metadata = &metadata{ + promURL: promURL, + + // Start out with the full time range. The shipper will constrain it later. + // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. + mint: 0, + maxt: math.MaxInt64, + } + sidecarIsReady = false + ) + + readinessFunc := func() (bool, error) { + return sidecarIsReady, nil + } - // Start out with the full time range. The shipper will constrain it later. - // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. - mint: 0, - maxt: math.MaxInt64, + if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr, component, readinessFunc); err != nil { + return err } // Setup all the concurrent groups. @@ -149,7 +160,7 @@ func runSidecar( return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured") } - // New gossip cluster. + // New gossip cluster.grpcBindAddr mint, maxt := metadata.Timestamps() if err = peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{ Labels: metadata.LabelsPB(), @@ -168,11 +179,13 @@ func runSidecar( if err := metadata.UpdateLabels(iterCtx, logger); err != nil { level.Warn(logger).Log("msg", "heartbeat failed", "err", err) promUp.Set(0) + sidecarIsReady = false } else { // Update gossip. peer.SetLabels(metadata.LabelsPB()) promUp.Set(1) + sidecarIsReady = true lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9) } @@ -191,9 +204,7 @@ func runSidecar( cancel() }) } - if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil { - return err - } + { l, err := net.Listen("tcp", grpcBindAddr) if err != nil { @@ -218,8 +229,10 @@ func runSidecar( g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) + sidecarIsReady = true return errors.Wrap(s.Serve(l), "serve gRPC") }, func(error) { + sidecarIsReady = false s.Stop() runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") }) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index b697cbeb8ce..0b61dd62bdb 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -88,11 +88,20 @@ func runStore( syncInterval time.Duration, ) error { { + storeIsReady := false bucketConfig, err := objStoreConfig.Content() if err != nil { return err } + readinessFunc := func() (bool, error) { + return storeIsReady, nil + } + + if err = metricHTTPListenGroup(g, logger, reg, httpBindAddr, component, readinessFunc); err != nil { + return err + } + bkt, err := client.NewBucket(logger, bucketConfig, reg, component) if err != nil { return errors.Wrap(err, "create bucket client") @@ -158,8 +167,10 @@ func runStore( g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) + storeIsReady = true return errors.Wrap(s.Serve(l), "serve gRPC") }, func(error) { + storeIsReady = false runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") }) } @@ -184,9 +195,6 @@ func runStore( peer.Close(5 * time.Second) }) } - if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil { - return err - } level.Info(logger).Log("msg", "starting store node") return nil diff --git a/kube/manifests/prometheus-gcs.yaml b/kube/manifests/prometheus-gcs.yaml index c764fc853a1..e3745035626 100644 --- a/kube/manifests/prometheus-gcs.yaml +++ b/kube/manifests/prometheus-gcs.yaml @@ -73,6 +73,14 @@ spec: containerPort: 10901 - name: cluster containerPort: 10900 + livenessProbe: + httpGet: + path: /-/healthy + port: sidecar-http + readinessProbe: + httpGet: + path: /-/ready + port: sidecar-http volumeMounts: - name: data mountPath: /var/prometheus diff --git a/kube/manifests/prometheus.yaml b/kube/manifests/prometheus.yaml index af9abd77c8f..f02d0a5874f 100644 --- a/kube/manifests/prometheus.yaml +++ b/kube/manifests/prometheus.yaml @@ -68,6 +68,14 @@ spec: containerPort: 10901 - name: cluster containerPort: 10900 + livenessProbe: + httpGet: + path: /-/healthy + port: sidecar-http + readinessProbe: + httpGet: + path: /-/ready + port: sidecar-http volumeMounts: - name: data mountPath: /var/prometheus diff --git a/kube/manifests/thanos-query.yaml b/kube/manifests/thanos-query.yaml index 9eadc85c8f5..b71fb3c549f 100644 --- a/kube/manifests/thanos-query.yaml +++ b/kube/manifests/thanos-query.yaml @@ -40,6 +40,10 @@ spec: httpGet: path: /-/healthy port: http + readinessProbe: + httpGet: + path: /-/ready + port: http --- apiVersion: v1 kind: Service diff --git a/kube/manifests/thanos-store.yaml b/kube/manifests/thanos-store.yaml index 3b2c98d1fa9..c6e61bf10be 100644 --- a/kube/manifests/thanos-store.yaml +++ b/kube/manifests/thanos-store.yaml @@ -39,6 +39,14 @@ spec: containerPort: 10901 - name: cluster containerPort: 10900 + livenessProbe: + httpGet: + path: /-/healthy + port: http + readinessProbe: + httpGet: + path: /-/ready + port: http volumeMounts: - mountPath: /creds/ name: gcs-credentials