Skip to content

Commit

Permalink
feat: Added /-/healthy endpoint to Rule and the generic metrics HTTP …
Browse files Browse the repository at this point in the history
…server.

Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
FUSAKLA committed Dec 10, 2018
1 parent fe17cf7 commit 16fd343
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 42 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

### Added
- '/-/healthy' and `/-/ready` endpoint to all node types.

## [v0.2.0](https://github.com/improbable-eng/thanos/releases/tag/v0.2.0) - 2018.12.10

Next Thanos release adding support to new discovery method, gRPC mTLS and two new object store providers (Swift and Azure).

Note lots of necessary breaking changes in flags that relates to bucket configuration.

### Deprecated
Expand All @@ -39,7 +42,7 @@ Note lots of necessary breaking changes in flags that relates to bucket configur
### Changed

- *breaking*: Added `thanos_` prefix to memberlist (gossip) metrics. Make sure to update your dashboards and rules.
- S3 provider:
- S3 provider:
- Set `"X-Amz-Acl": "bucket-owner-full-control"` metadata for s3 upload operation.

### Added
Expand All @@ -53,7 +56,7 @@ Note lots of necessary breaking changes in flags that relates to bucket configur
- In `thanos query`, file based discovery of store nodes using `--store.file-sd-config.files`
- `/-/healthy` endpoint to Querier.
- DNS service discovery to static and file based configurations using the `dns+` and `dnssrv+` prefixes for the respective lookup. Details [here](/docs/thanos_service_discovery.md)
- `--cluster.disable` flag to disable gossip functionality completely.
- `--cluster.disable` flag to disable gossip functionality completely.
- Hidden flag to configure max compaction level.
- Azure Storage.
- OpenStack Swift support.
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,11 @@ func runCompact(
cancel()
})

if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
readinessFunc := func() (bool, error) {
return true, nil
}

if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr, component, readinessFunc); err != nil {
return err
}

Expand Down
36 changes: 35 additions & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,38 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{}))
}

func registerHealthy(mux *http.ServeMux, logger log.Logger, component string) {
mux.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos %v is Healthy.\n", component); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
level.Error(logger).Log("msg", "Could not write health check response.")
}
})
}

func registerReady(mux *http.ServeMux, logger log.Logger, component string, readinessFunc func() (bool, error)) {
mux.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) {
ready, err := readinessFunc()
if err != nil {
level.Error(logger).Log("msg", "Failed to get readiness status.")
ready = false
}

if ready {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos %v is ready.\n", component); err != nil {
level.Error(logger).Log("msg", "Could not write readiness check response.")
}
return
}
w.WriteHeader(http.StatusServiceUnavailable)
if _, err := fmt.Fprintf(w, "Thanos %v is not ready.\n", component); err != nil {
level.Error(logger).Log("msg", "Could not write readiness check response.")
}
})
}

// defaultGRPCServerOpts returns default gRPC server opts that includes:
// - request histogram
// - tracing
Expand Down Expand Up @@ -294,10 +326,12 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
}

// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, component string, readinessFunc func() (bool, error)) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
registerHealthy(mux, logger, component)
registerReady(mux, logger, component, readinessFunc)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
Expand Down
72 changes: 72 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"fmt"
"net/http"
"os"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
)

func queryHTTPGetEndpoint(ctx context.Context, t *testing.T, logger log.Logger, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s", url), nil)
testutil.Ok(t, err)
return http.DefaultClient.Do(req.WithContext(ctx))
}

func isHealthy() (bool, error) {
return true, nil
}

func TestGenericHttpEndpoints(t *testing.T) {
var g run.Group
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
metricsRegistry := prometheus.NewRegistry()
component := "sidecar"
ctx := context.Background()

freePort, err := testutil.FreePort()
testutil.Ok(t, err)

serverAddress := fmt.Sprintf("127.0.0.1:%d", freePort)

err = metricHTTPListenGroup(&g, logger, metricsRegistry, serverAddress, component, isHealthy)
testutil.Ok(t, err)
go func() { _ = g.Run() }()

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/healthy"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/ready"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/metrics"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/debug/pprof/"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))
}
21 changes: 20 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ func runQuery(
fileSD *file.Discovery,
dnsSDInterval time.Duration,
) error {
var querierIsReady = false

duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_query_duplicated_store_address",
Help: "The number of times a duplicated store addresses is detected from the different configs in query",
Expand Down Expand Up @@ -386,11 +388,25 @@ func runQuery(

router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil {
if _, err := fmt.Fprintf(w, "Thanos Querier is healthy.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write health check response.")
}
})

router.Get("/-/ready", func(w http.ResponseWriter, r *http.Request) {
if querierIsReady {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos Querier is ready.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write readiness check response.")
}
return
}
w.WriteHeader(http.StatusServiceUnavailable)
if _, err := fmt.Fprintf(w, "Thanos Querier is not ready.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write readiness check response.")
}
})

mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
Expand All @@ -405,6 +421,7 @@ func runQuery(
level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr)
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
querierIsReady = false
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}
Expand All @@ -426,9 +443,11 @@ func runQuery(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
querierIsReady = true
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
querierIsReady = false
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}
Expand Down
75 changes: 52 additions & 23 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,10 @@ func runRule(
cancel()
})
}
var ruleIsReady = false
{
var storeLset []storepb.Label

for _, l := range lset {
storeLset = append(storeLset, storepb.Label{Name: l.Name, Value: l.Value})
}
Expand Down Expand Up @@ -504,37 +506,36 @@ func runRule(
cancel()
})
}
// Start gRPC server.
{
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "store")

store := store.NewTSDBStore(logger, reg, db, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}
// Start UI & metrics HTTP server.
{
router := route.New()
router.Post("/-/reload", func(w http.ResponseWriter, r *http.Request) {
reload <- struct{}{}
})

router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err = fmt.Fprintf(w, "Thanos Rule is Healthy.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write health check response.")
}
})

router.Get("/-/ready", func(w http.ResponseWriter, r *http.Request) {
if ruleIsReady {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos rule is ready.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write readiness check response.")
}
return
}
w.WriteHeader(http.StatusServiceUnavailable)
if _, err := fmt.Fprintf(w, "Thanos rule is not ready.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write readiness check response.")
}

})

ui.NewRuleUI(logger, mgr, alertQueryURL.String()).Register(router)

mux := http.NewServeMux()
Expand All @@ -551,10 +552,38 @@ func runRule(
level.Info(logger).Log("msg", "Listening for ui requests", "address", httpBindAddr)
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
ruleIsReady = false
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}

// Start gRPC server.
{
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "store")

store := store.NewTSDBStore(logger, reg, db, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
ruleIsReady = true
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
ruleIsReady = false
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}

var uploads = true

bucketConfig, err := objStoreConfig.Content()
Expand Down
Loading

0 comments on commit 16fd343

Please sign in to comment.