Skip to content

Commit

Permalink
feat: added /-/healthy and /-/ready endppoints
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
Martin Chodur authored and FUSAKLA committed Apr 15, 2019
1 parent e6d5b49 commit a8d9a27
Show file tree
Hide file tree
Showing 25 changed files with 643 additions and 226 deletions.
13 changes: 8 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added
- [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver
- [#644](https://github.com/improbable-eng/thanos/issues/644) Added `/-/healthy` and `/-/ready` endpoints to all node types
### Fixed
- [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples.
- [#910](https://github.com/improbable-eng/thanos/pull/910) Query's stores UI page is now sorted by type and old DNS or File SD stores are removed after 5 minutes (configurable via the new `--store.unhealthy-timeout=5m` flag).

New options:

* `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources.
* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Considering making it lower or bigger depending on the scale of your deployment.
* `--index.generate-missing-cache-file` if enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload. By default is disabled. Check logs on existence the line `generating index cache files is done`, then you can disable this flag.
* `--index.generate-missing-cache-file` if enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload. By default is disabled. Check logs on existence the line `generating index cache files is done`, then you can disable this flag.

New metrics:
* `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit;
Expand All @@ -34,17 +36,18 @@ New tracing span:
:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query.

- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`.
- [#1016](https://github.com/improbable-eng/thanos/pull/1016) Added option for another DNS resolver (miekg/dns client).
- [#1016](https://github.com/improbable-eng/thanos/pull/1016) Added option for another DNS resolver (miekg/dns client).
This to have SRV resolution working on [Golang 1.11+ with KubeDNS below v1.14](https://github.com/golang/go/issues/27546)
- [#986](https://github.com/improbable-eng/thanos/pull/986) Store index cache files in object storage, reduces store start-up time by skipping the generating the index cache for all blocks and only do this for recently created uncompacted blocks.
- [#986](https://github.com/improbable-eng/thanos/pull/986) Store index cache files in object storage, reduces store start-up time by skipping the generating the index cache for all blocks and only do this for recently created uncompacted blocks.

### Changed
### Changed
- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API.
- [#970](https://github.com/improbable-eng/thanos/pull/970) No `PartialResponseStrategy` field for `RuleGroups` by default means `abort` strategy (old PartialResponse disabled) as this is recommended option for Rules and alerts.

### Fixed
- [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far
- [#966](https://github.com/improbable-eng/thanos/pull/966) Bucket: verify no longer warns about overlapping blocks, that overlap `0s`
- [#966](https://github.com/improbable-eng/thanos/pull/966) Bucket: verify no longer warns about overlapping blocks, that overlap `0s`
- [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far

## [v0.3.2](https://github.com/improbable-eng/thanos/releases/tag/v0.3.2) - 2019.03.04

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/olekukonko/tablewriter"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/labels"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/alecthomas/kingpin.v2"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

var (
Expand Down
26 changes: 14 additions & 12 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ import (
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/prober"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"gopkg.in/alecthomas/kingpin.v2"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

var (
Expand Down Expand Up @@ -124,7 +126,6 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compact.ResolutionLevel5m: time.Duration(*retention5m),
compact.ResolutionLevel1h: time.Duration(*retention1h),
},
name,
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
Expand All @@ -146,7 +147,6 @@ func runCompact(
wait bool,
generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
Expand All @@ -167,12 +167,12 @@ func runCompact(

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
return errors.Wrap(err, "loading flag content")
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Compact.String())
if err != nil {
return err
return errors.Wrap(err, "initializing bucket")
}

// Ensure we close up everything properly.
Expand Down Expand Up @@ -204,6 +204,12 @@ func runCompact(
return errors.Wrap(err, "create compactor")
}

readinessProber := prober.NewProber(component.Compact, logger)
err = metricHTTPListenGroup(g, logger, reg, httpBindAddr, *readinessProber)
if err != nil {
return errors.Wrap(err, "create readiness prober")
}

var (
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
Expand Down Expand Up @@ -307,14 +313,10 @@ func runCompact(

return errors.Wrap(err, "error executing compaction")
})
}, func(error) {
}, func(err error) {
cancel()
})

if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
return err
}

level.Info(logger).Log("msg", "starting compact node")
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunkenc"
"gopkg.in/alecthomas/kingpin.v2"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerDownsample(m map[string]setupFunc, app *kingpin.Application, name string) {
Expand All @@ -49,12 +49,12 @@ func runDownsample(
) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
return errors.Wrap(err, "loading flag content")
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Downsample.String())
if err != nil {
return err
return errors.Wrap(err, "initializing bucket")
}

// Ensure we close up everything properly.
Expand Down
12 changes: 8 additions & 4 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ import (
"syscall"

gmetrics "github.com/armon/go-metrics"

gprom "github.com/armon/go-metrics/prometheus"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/prober"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -295,10 +295,11 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
}

// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, readinessProber prober.Prober) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber.RegisterInMux(mux)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
Expand All @@ -307,8 +308,11 @@ func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Regi

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
readinessProber.SetReady()
return errors.Wrap(http.Serve(l, mux), "serve metrics")
}, func(error) {
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "metric listener")
})
return nil
Expand Down
63 changes: 63 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"context"
"fmt"
"net/http"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/prober"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
)

type TestComponent struct {
name string
}

func (c TestComponent) String() string {
return c.name
}

func queryHTTPGetEndpoint(ctx context.Context, t *testing.T, logger log.Logger, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s", url), nil)
testutil.Ok(t, err)
return http.DefaultClient.Do(req.WithContext(ctx))
}

func TestGenericHttpEndpoints(t *testing.T) {
var g run.Group
logger := log.NewNopLogger()
metricsRegistry := prometheus.NewRegistry()
component := TestComponent{name: "sidecar"}
ctx := context.Background()

freePort, err := testutil.FreePort()
testutil.Ok(t, err)

serverAddress := fmt.Sprintf("127.0.0.1:%d", freePort)

p := prober.NewProber(component, logger)
err = metricHTTPListenGroup(&g, logger, metricsRegistry, serverAddress, *p)
testutil.Ok(t, err)
go func() { _ = g.Run() }()

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/metrics"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/debug/pprof/"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))
}
22 changes: 14 additions & 8 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/prober"
"github.com/improbable-eng/thanos/pkg/query"
v1 "github.com/improbable-eng/thanos/pkg/query/api"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand Down Expand Up @@ -300,6 +301,8 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

readinessProber := prober.NewProber(component.Query, logger)

var (
stores = query.NewStoreSet(
logger,
Expand Down Expand Up @@ -340,6 +343,7 @@ func runQuery(
},
)
)

// Periodically update the store set with the addresses we see in our cluster.
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -388,6 +392,7 @@ func runQuery(
close(fileSDUpdates)
})
}

{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
Expand Down Expand Up @@ -415,6 +420,7 @@ func runQuery(
cancel()
})
}

// Start query API + UI HTTP server.
{
router := route.New()
Expand All @@ -438,12 +444,7 @@ func runQuery(

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger)

router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write health check response.")
}
})
readinessProber.RegisterInRouter(router)

mux := http.NewServeMux()
registerMetrics(mux, reg)
Expand All @@ -457,11 +458,14 @@ func runQuery(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}

// Start query (proxy) gRPC StoreAPI.
{
l, err := net.Listen("tcp", grpcBindAddr)
Expand All @@ -480,8 +484,10 @@ func runQuery(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
readinessProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
}, func(err error) {
readinessProber.SetNotReady(err)
s.Stop()
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
Expand Down
Loading

0 comments on commit a8d9a27

Please sign in to comment.