Skip to content

Commit

Permalink
feat: Added /-/healthy endpoint to Rule and the generic metrics HTTP …
Browse files Browse the repository at this point in the history
…server.

Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
FUSAKLA committed Jan 13, 2019
1 parent b6dcbf5 commit 9680def
Show file tree
Hide file tree
Showing 15 changed files with 506 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- Partial Response disable option for StoreAPI and QueryAPI.
- Partial Response disable button on Thanos UI
- [#644](https://github.com/improbable-eng/thanos/issues/644) Added `/-/healthy` and `/-/ready` endpoints to all node types.

### Fixed

Expand Down
14 changes: 9 additions & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func runCompact(
return errors.Wrap(err, "create compactor")
}

readinessProber, err := metricHTTPListenGroup(g, logger, reg, httpBindAddr, component)
if err != nil {
readinessProber.SetNotHealthy(err)
return err
}

var (
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
Expand Down Expand Up @@ -270,14 +276,12 @@ func runCompact(

return errors.Wrap(err, "error executing compaction")
})
}, func(error) {
}, func(err error) {
readinessProber.SetNotReady(err)
cancel()
})

if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
return err
}

level.Info(logger).Log("msg", "starting compact node")
readinessProber.SetReady()
return nil
}
11 changes: 7 additions & 4 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,21 +294,24 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
}

// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, component string) (*prober.Prober, error) {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber := prober.NewProbeInMux(component, mux, logger)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
return nil, errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve metrics")
}, func(error) {
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "metric listener")
})
return nil
return readinessProber, nil
}
68 changes: 68 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"fmt"
"net/http"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
)

func queryHTTPGetEndpoint(ctx context.Context, t *testing.T, logger log.Logger, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s", url), nil)
testutil.Ok(t, err)
return http.DefaultClient.Do(req.WithContext(ctx))
}

func TestGenericHttpEndpoints(t *testing.T) {
var g run.Group
logger := log.NewNopLogger()
metricsRegistry := prometheus.NewRegistry()
component := "sidecar"
ctx := context.Background()

freePort, err := testutil.FreePort()
testutil.Ok(t, err)

serverAddress := fmt.Sprintf("127.0.0.1:%d", freePort)

readinessProber, err := metricHTTPListenGroup(&g, logger, metricsRegistry, serverAddress, component)
testutil.Ok(t, err)
go func() { _ = g.Run() }()

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/healthy"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

readinessProber.SetReady()
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/ready"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/metrics"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/debug/pprof/"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))
}
19 changes: 11 additions & 8 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/prober"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -128,6 +129,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*key,
*caCert,
*serverName,
name,
*httpBindAddr,
*maxConcurrentQueries,
time.Duration(*queryTimeout),
Expand Down Expand Up @@ -240,6 +242,7 @@ func runQuery(
key string,
caCert string,
serverName string,
component string,
httpBindAddr string,
maxConcurrentQueries int,
queryTimeout time.Duration,
Expand Down Expand Up @@ -372,6 +375,7 @@ func runQuery(
cancel()
})
}
var readinessProber *prober.Prober
// Start query API + UI HTTP server.
{
router := route.New()
Expand All @@ -380,12 +384,7 @@ func runQuery(
api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse)
api.Register(router.WithPrefix("/api/v1"), tracer, logger)

router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write health check response.")
}
})
readinessProber = prober.NewProbeInRouter(component, router, logger)

mux := http.NewServeMux()
registerMetrics(mux, reg)
Expand All @@ -399,8 +398,10 @@ func runQuery(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}
Expand All @@ -422,9 +423,11 @@ func runQuery(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
readinessProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
}, func(err error) {
s.Stop()
readinessProber.SetNotReady(err)
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}
Expand Down
62 changes: 36 additions & 26 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"time"

"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/prober"
opentracing "github.com/opentracing/opentracing-go"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -36,7 +39,6 @@ import (
"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/improbable-eng/thanos/pkg/ui"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand All @@ -50,7 +52,6 @@ import (
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerRule registers a rule command.
Expand Down Expand Up @@ -502,37 +503,17 @@ func runRule(
cancel()
})
}
// Start gRPC server.
{
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "store")

store := store.NewTSDBStore(logger, reg, db, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}
var readinessProber *prober.Prober
// Start UI & metrics HTTP server.
{
router := route.New()
router.Post("/-/reload", func(w http.ResponseWriter, r *http.Request) {
reload <- struct{}{}
})

readinessProber = prober.NewProbeInRouter(component, router, logger)

ui.NewRuleUI(logger, mgr, alertQueryURL.String()).Register(router)

mux := http.NewServeMux()
Expand All @@ -547,12 +528,41 @@ func runRule(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for ui requests", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}

// Start gRPC server.
{
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "store")

store := store.NewTSDBStore(logger, reg, db, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
readinessProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(err error) {
s.Stop()
readinessProber.SetNotReady(err)
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}

var uploads = true

bucketConfig, err := objStoreConfig.Content()
Expand Down
Loading

0 comments on commit 9680def

Please sign in to comment.