diff --git a/CHANGELOG.md b/CHANGELOG.md index 76c933a9308..0ebc235a4b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - Partial Response disable option for StoreAPI and QueryAPI. - Partial Response disable button on Thanos UI +- [#644](https://github.com/improbable-eng/thanos/issues/644) Added `/-/healthy` and `/-/ready` endpoints to all node types. ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 5a483efe3a2..93e68478b87 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -178,6 +178,12 @@ func runCompact( return errors.Wrap(err, "create compactor") } + readinessProber, err := metricHTTPListenGroup(g, logger, reg, httpBindAddr, component) + if err != nil { + readinessProber.SetNotHealthy(err) + return err + } + var ( compactDir = path.Join(dataDir, "compact") downsamplingDir = path.Join(dataDir, "downsample") @@ -270,14 +276,12 @@ func runCompact( return errors.Wrap(err, "error executing compaction") }) - }, func(error) { + }, func(err error) { + readinessProber.SetNotReady(err) cancel() }) - if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil { - return err - } - level.Info(logger).Log("msg", "starting compact node") + readinessProber.SetReady() return nil } diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index b0bb1f75d36..f1c22c6753f 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -294,21 +294,24 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o } // metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics. -func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error { +func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, component string) (*prober.Prober, error) { mux := http.NewServeMux() registerMetrics(mux, reg) registerProfile(mux) + readinessProber := prober.NewProbeInMux(component, mux, logger) l, err := net.Listen("tcp", httpBindAddr) if err != nil { - return errors.Wrap(err, "listen metrics address") + return nil, errors.Wrap(err, "listen metrics address") } g.Add(func() error { level.Info(logger).Log("msg", "Listening for metrics", "address", httpBindAddr) + readinessProber.SetHealthy() return errors.Wrap(http.Serve(l, mux), "serve metrics") - }, func(error) { + }, func(err error) { + readinessProber.SetNotHealthy(err) runutil.CloseWithLogOnErr(logger, l, "metric listener") }) - return nil + return readinessProber, nil } diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go new file mode 100644 index 00000000000..1d5db30e8b3 --- /dev/null +++ b/cmd/thanos/main_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" +) + +func queryHTTPGetEndpoint(ctx context.Context, t *testing.T, logger log.Logger, url string) (*http.Response, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s", url), nil) + testutil.Ok(t, err) + return http.DefaultClient.Do(req.WithContext(ctx)) +} + +func TestGenericHttpEndpoints(t *testing.T) { + var g run.Group + logger := log.NewNopLogger() + metricsRegistry := prometheus.NewRegistry() + component := "sidecar" + ctx := context.Background() + + freePort, err := testutil.FreePort() + testutil.Ok(t, err) + + serverAddress := fmt.Sprintf("127.0.0.1:%d", freePort) + + readinessProber, err := metricHTTPListenGroup(&g, logger, metricsRegistry, serverAddress, component) + testutil.Ok(t, err) + go func() { _ = g.Run() }() + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/healthy")) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) + + readinessProber.SetReady() + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/ready")) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/metrics")) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/debug/pprof/")) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) +} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index bf6cfa64fe8..08b38d690fb 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -12,6 +12,7 @@ import ( "time" "github.com/improbable-eng/thanos/pkg/extprom" + "github.com/improbable-eng/thanos/pkg/prober" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -128,6 +129,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string *key, *caCert, *serverName, + name, *httpBindAddr, *maxConcurrentQueries, time.Duration(*queryTimeout), @@ -240,6 +242,7 @@ func runQuery( key string, caCert string, serverName string, + component string, httpBindAddr string, maxConcurrentQueries int, queryTimeout time.Duration, @@ -372,6 +375,7 @@ func runQuery( cancel() }) } + var readinessProber *prober.Prober // Start query API + UI HTTP server. { router := route.New() @@ -380,12 +384,7 @@ func runQuery( api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse) api.Register(router.WithPrefix("/api/v1"), tracer, logger) - router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil { - level.Error(logger).Log("msg", "Could not write health check response.") - } - }) + readinessProber = prober.NewProbeInRouter(component, router, logger) mux := http.NewServeMux() registerMetrics(mux, reg) @@ -399,8 +398,10 @@ func runQuery( g.Add(func() error { level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr) + readinessProber.SetHealthy() return errors.Wrap(http.Serve(l, mux), "serve query") - }, func(error) { + }, func(err error) { + readinessProber.SetNotHealthy(err) runutil.CloseWithLogOnErr(logger, l, "query and metric listener") }) } @@ -422,9 +423,11 @@ func runQuery( g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) + readinessProber.SetReady() return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { + }, func(err error) { s.Stop() + readinessProber.SetNotReady(err) runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") }) } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 03fed102d2e..c50d599b047 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -20,6 +20,9 @@ import ( "time" "github.com/improbable-eng/thanos/pkg/extprom" + "github.com/improbable-eng/thanos/pkg/prober" + opentracing "github.com/opentracing/opentracing-go" + kingpin "gopkg.in/alecthomas/kingpin.v2" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -36,7 +39,6 @@ import ( "github.com/improbable-eng/thanos/pkg/tracing" "github.com/improbable-eng/thanos/pkg/ui" "github.com/oklog/run" - "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -50,7 +52,6 @@ import ( "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/tsdb/labels" "google.golang.org/grpc" - "gopkg.in/alecthomas/kingpin.v2" ) // registerRule registers a rule command. @@ -502,30 +503,8 @@ func runRule( cancel() }) } - // Start gRPC server. - { - l, err := net.Listen("tcp", grpcBindAddr) - if err != nil { - return errors.Wrap(err, "listen API address") - } - logger := log.With(logger, "component", "store") - store := store.NewTSDBStore(logger, reg, db, lset) - - opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) - if err != nil { - return errors.Wrap(err, "setup gRPC options") - } - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, store) - - g.Add(func() error { - return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { - s.Stop() - runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") - }) - } + var readinessProber *prober.Prober // Start UI & metrics HTTP server. { router := route.New() @@ -533,6 +512,8 @@ func runRule( reload <- struct{}{} }) + readinessProber = prober.NewProbeInRouter(component, router, logger) + ui.NewRuleUI(logger, mgr, alertQueryURL.String()).Register(router) mux := http.NewServeMux() @@ -547,12 +528,41 @@ func runRule( g.Add(func() error { level.Info(logger).Log("msg", "Listening for ui requests", "address", httpBindAddr) + readinessProber.SetHealthy() return errors.Wrap(http.Serve(l, mux), "serve query") - }, func(error) { + }, func(err error) { + readinessProber.SetNotHealthy(err) runutil.CloseWithLogOnErr(logger, l, "query and metric listener") }) } + // Start gRPC server. + { + l, err := net.Listen("tcp", grpcBindAddr) + if err != nil { + return errors.Wrap(err, "listen API address") + } + logger := log.With(logger, "component", "store") + + store := store.NewTSDBStore(logger, reg, db, lset) + + opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC options") + } + s := grpc.NewServer(opts...) + storepb.RegisterStoreServer(s, store) + + g.Add(func() error { + readinessProber.SetReady() + return errors.Wrap(s.Serve(l), "serve gRPC") + }, func(err error) { + s.Stop() + readinessProber.SetNotReady(err) + runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") + }) + } + var uploads = true bucketConfig, err := objStoreConfig.Content() diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 61f493f93a8..c39f0519b9b 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -102,13 +102,21 @@ func runSidecar( reloader *reloader.Reloader, component string, ) error { - var metadata = &metadata{ - promURL: promURL, + var ( + metadata = &metadata{ + promURL: promURL, + + // Start out with the full time range. The shipper will constrain it later. + // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. + mint: 0, + maxt: math.MaxInt64, + } + ) - // Start out with the full time range. The shipper will constrain it later. - // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. - mint: 0, - maxt: math.MaxInt64, + readinessProber, err := metricHTTPListenGroup(g, logger, reg, httpBindAddr, component) + if err != nil { + readinessProber.SetNotHealthy(err) + return err } // Setup all the concurrent groups. @@ -133,6 +141,7 @@ func runSidecar( "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", "err", err, ) + readinessProber.SetNotReady(err) promUp.Set(0) return err } @@ -168,18 +177,21 @@ func runSidecar( if err := metadata.UpdateLabels(iterCtx, logger); err != nil { level.Warn(logger).Log("msg", "heartbeat failed", "err", err) promUp.Set(0) + readinessProber.SetNotReady(err) } else { // Update gossip. peer.SetLabels(metadata.LabelsPB()) promUp.Set(1) + readinessProber.SetReady() lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9) } return nil }) - }, func(error) { + }, func(err error) { cancel() + readinessProber.SetNotReady(err) peer.Close(2 * time.Second) }) } @@ -187,13 +199,12 @@ func runSidecar( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return reloader.Watch(ctx) - }, func(error) { + }, func(err error) { + readinessProber.SetNotReady(err) cancel() }) } - if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil { - return err - } + { l, err := net.Listen("tcp", grpcBindAddr) if err != nil { @@ -218,8 +229,10 @@ func runSidecar( g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) + readinessProber.SetReady() return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { + }, func(err error) { + readinessProber.SetNotReady(err) s.Stop() runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") }) @@ -278,6 +291,7 @@ func runSidecar( } level.Info(logger).Log("msg", "starting sidecar", "peer", peer.Name()) + readinessProber.SetReady() return nil } diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index f8e03163c54..2dd871024fa 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" + "github.com/improbable-eng/thanos/pkg/prober" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/store" "github.com/improbable-eng/thanos/pkg/store/storepb" @@ -87,12 +88,19 @@ func runStore( verbose bool, syncInterval time.Duration, ) error { + var readinessProber *prober.Prober { bucketConfig, err := objStoreConfig.Content() if err != nil { return err } + readinessProber, err = metricHTTPListenGroup(g, logger, reg, httpBindAddr, component) + if err != nil { + readinessProber.SetNotHealthy(err) + return err + } + bkt, err := client.NewBucket(logger, bucketConfig, reg, component) if err != nil { return errors.Wrap(err, "create bucket client") @@ -158,8 +166,10 @@ func runStore( g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) + readinessProber.SetReady() return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { + }, func(err error) { + readinessProber.SetNotReady(err) runutil.CloseWithLogOnErr(logger, l, "store gRPC listener") }) } @@ -184,10 +194,8 @@ func runStore( peer.Close(5 * time.Second) }) } - if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil { - return err - } level.Info(logger).Log("msg", "starting store node") + readinessProber.SetReady() return nil } diff --git a/kube/manifests/prometheus-gcs.yaml b/kube/manifests/prometheus-gcs.yaml index c12910ca9af..a67f80dea5f 100644 --- a/kube/manifests/prometheus-gcs.yaml +++ b/kube/manifests/prometheus-gcs.yaml @@ -73,6 +73,14 @@ spec: containerPort: 10901 - name: cluster containerPort: 10900 + livenessProbe: + httpGet: + path: /-/healthy + port: sidecar-http + readinessProbe: + httpGet: + path: /-/ready + port: sidecar-http volumeMounts: - name: data mountPath: /var/prometheus diff --git a/kube/manifests/prometheus.yaml b/kube/manifests/prometheus.yaml index 5ae892574d5..a22e82c183d 100644 --- a/kube/manifests/prometheus.yaml +++ b/kube/manifests/prometheus.yaml @@ -106,6 +106,14 @@ spec: containerPort: 10901 - name: cluster containerPort: 10900 + livenessProbe: + httpGet: + path: /-/healthy + port: sidecar-http + readinessProbe: + httpGet: + path: /-/ready + port: sidecar-http volumeMounts: - name: data mountPath: /var/prometheus diff --git a/kube/manifests/thanos-query.yaml b/kube/manifests/thanos-query.yaml index e013627e56e..00d23737b95 100644 --- a/kube/manifests/thanos-query.yaml +++ b/kube/manifests/thanos-query.yaml @@ -40,6 +40,10 @@ spec: httpGet: path: /-/healthy port: http + readinessProbe: + httpGet: + path: /-/ready + port: http --- apiVersion: v1 kind: Service diff --git a/kube/manifests/thanos-store.yaml b/kube/manifests/thanos-store.yaml index 723cbb3c1ef..04c6eb82b8c 100644 --- a/kube/manifests/thanos-store.yaml +++ b/kube/manifests/thanos-store.yaml @@ -39,6 +39,14 @@ spec: containerPort: 10901 - name: cluster containerPort: 10900 + livenessProbe: + httpGet: + path: /-/healthy + port: http + readinessProbe: + httpGet: + path: /-/ready + port: http volumeMounts: - mountPath: /creds/ name: gcs-credentials diff --git a/pkg/prober/prober.go b/pkg/prober/prober.go new file mode 100644 index 00000000000..c2378f4128f --- /dev/null +++ b/pkg/prober/prober.go @@ -0,0 +1,144 @@ +package prober + +import ( + "fmt" + "io" + "net/http" + "sync" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/route" +) + +const ( + healthyEndpointPath = "/-/healthy" + readyEndpointPath = "/-/ready" + okProbeText = "thanos %s is %s" + probeErrorHTTPStatus = 500 + initialErrorText = "thanos %s is initializing" +) + +// Prober represents health and readriness status of given compoent. +type Prober struct { + logger log.Logger + loggerMtx sync.Mutex + componentMtx sync.Mutex + component string + readyMtx sync.Mutex + readiness error + healthyMtx sync.Mutex + healthiness error +} + +// SetLogger sets logger used by the Prober. +func (p *Prober) SetLogger(logger log.Logger) { + p.loggerMtx.Lock() + defer p.loggerMtx.Unlock() + p.logger = logger +} + +func (p *Prober) getLogger() log.Logger { + p.loggerMtx.Lock() + defer p.loggerMtx.Unlock() + return p.logger +} + +// SetComponent sets component name of the Prober displayed in responses. +func (p *Prober) SetComponent(component string) { + p.componentMtx.Lock() + defer p.componentMtx.Unlock() + p.component = component +} + +func (p *Prober) getComponent() string { + p.componentMtx.Lock() + defer p.componentMtx.Unlock() + return p.component +} + +// NewProber returns Prober reprezenting readiness and healthiness of given component. +func NewProber(component string, logger log.Logger) *Prober { + initialErr := fmt.Errorf(initialErrorText, component) + prober := &Prober{} + prober.SetComponent(component) + prober.SetLogger(logger) + prober.SetNotHealthy(initialErr) + prober.SetNotReady(initialErr) + return prober +} + +// NewProbeInRouter returns new Prober which registers it's ready and health endpoints to given router. +func NewProbeInRouter(component string, router *route.Router, logger log.Logger) *Prober { + prober := NewProber(component, logger) + router.Get(healthyEndpointPath, prober.probeHandlerFunc(prober.IsHealthy, "healthy")) + router.Get(readyEndpointPath, prober.probeHandlerFunc(prober.IsReady, "ready")) + return prober +} + +// NewProbeInMux returns new Prober which registers it's ready and health endpoints to given mux. +func NewProbeInMux(component string, mux *http.ServeMux, logger log.Logger) *Prober { + prober := NewProber(component, logger) + mux.HandleFunc(healthyEndpointPath, prober.probeHandlerFunc(prober.IsHealthy, "healthy")) + mux.HandleFunc(readyEndpointPath, prober.probeHandlerFunc(prober.IsReady, "ready")) + return prober +} + +func (p *Prober) probeHandlerFunc(probeFunc func() error, probeType string) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, _ *http.Request) { + err := probeFunc() + if err == nil { + if _, e := io.WriteString(w, fmt.Sprintf(okProbeText, p.getComponent(), probeType)); e == nil { + level.Error(p.getLogger()).Log("msg", "failed to write probe response", "probe type", probeType, "err", err) + } + } else { + http.Error(w, err.Error(), probeErrorHTTPStatus) + } + } +} + +// IsReady returns error if component is not ready and nil if it is. +func (p *Prober) IsReady() error { + p.readyMtx.Lock() + defer p.readyMtx.Unlock() + return p.readiness +} + +// SetReady sets components status to ready. +func (p *Prober) SetReady() { + level.Debug(p.getLogger()).Log("msg", "changing probe status", "status", "ready") + p.SetNotReady(nil) +} + +// SetNotReady sets components status to not ready with given error as a cause. +func (p *Prober) SetNotReady(err error) { + p.readyMtx.Lock() + defer p.readyMtx.Unlock() + if err != nil { + level.Debug(p.getLogger()).Log("msg", "changing probe status", "status", "not-ready", "reason", err) + } + p.readiness = err +} + +// IsHealthy returns error if component is not healthy and nil if it is. +func (p *Prober) IsHealthy() error { + p.healthyMtx.Lock() + defer p.healthyMtx.Unlock() + return p.healthiness +} + +// SetHealthy sets components status to healthy. +func (p *Prober) SetHealthy() { + level.Debug(p.getLogger()).Log("msg", "changing probe status", "status", "healthy") + p.SetNotHealthy(nil) +} + +// SetNotHealthy sets components status to not healthy with given error as a cause. +func (p *Prober) SetNotHealthy(err error) { + p.healthyMtx.Lock() + defer p.healthyMtx.Unlock() + if err != nil { + level.Debug(p.getLogger()).Log("msg", "changing probe status", "status", "unhealthy", "reason", err) + } + p.healthiness = err +} diff --git a/pkg/prober/prober_test.go b/pkg/prober/prober_test.go new file mode 100644 index 00000000000..ca272a32450 --- /dev/null +++ b/pkg/prober/prober_test.go @@ -0,0 +1,162 @@ +package prober + +import ( + "context" + "fmt" + "net" + "net/http" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/oklog/run" + "github.com/pkg/errors" + "github.com/prometheus/common/route" +) + +func queryHTTPGetEndpoint(ctx context.Context, t *testing.T, logger log.Logger, url string) (*http.Response, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s", url), nil) + testutil.Ok(t, err) + return http.DefaultClient.Do(req.WithContext(ctx)) +} + +func TestProberHealthInitialState(t *testing.T) { + component := "test" + expectedErrorMessage := fmt.Sprintf(initialErrorText, component) + p := NewProber(component, log.NewNopLogger()) + + err := p.IsHealthy() + testutil.NotOk(t, err) + testutil.Equals(t, err.Error(), expectedErrorMessage) +} + +func TestProberReadinessInitialState(t *testing.T) { + component := "test" + expectedErrorMessage := fmt.Sprintf(initialErrorText, component) + p := NewProber(component, log.NewNopLogger()) + + err := p.IsReady() + testutil.NotOk(t, err) + testutil.Equals(t, err.Error(), expectedErrorMessage) +} + +func TestProberReadyStatusSetting(t *testing.T) { + component := "test" + testError := fmt.Errorf("test error") + p := NewProber(component, log.NewNopLogger()) + + p.SetReady() + err := p.IsReady() + testutil.Equals(t, err, nil) + p.SetNotReady(testError) + err = p.IsReady() + testutil.NotOk(t, err) +} + +func TestProberHeatlthyStatusSetting(t *testing.T) { + component := "test" + testError := fmt.Errorf("test error") + p := NewProber(component, log.NewNopLogger()) + + p.SetHealthy() + err := p.IsHealthy() + testutil.Equals(t, err, nil) + p.SetNotHealthy(testError) + err = p.IsHealthy() + testutil.NotOk(t, err) +} + +func TestProberMuxRegistering(t *testing.T) { + component := "test" + ctx := context.Background() + var g run.Group + mux := http.NewServeMux() + + freePort, err := testutil.FreePort() + testutil.Ok(t, err) + serverAddress := fmt.Sprintf("localhost:%d", freePort) + + l, err := net.Listen("tcp", serverAddress) + testutil.Ok(t, err) + g.Add(func() error { + return errors.Wrap(http.Serve(l, mux), "serve probes") + }, func(error) {}) + + p := NewProbeInMux(component, mux, log.NewNopLogger()) + + go func() { _ = g.Run() }() + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, healthyEndpointPath)) + testutil.Ok(t, err) + testutil.Equals(t, probeErrorHTTPStatus, resp.StatusCode) + return err + })) + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, readyEndpointPath)) + testutil.Ok(t, err) + testutil.Equals(t, probeErrorHTTPStatus, resp.StatusCode) + return err + })) + + p.SetHealthy() + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, healthyEndpointPath)) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) + +} + +func TestProberRouterRegistering(t *testing.T) { + component := "test" + router := route.New() + ctx := context.Background() + var g run.Group + mux := http.NewServeMux() + + freePort, err := testutil.FreePort() + testutil.Ok(t, err) + serverAddress := fmt.Sprintf("localhost:%d", freePort) + + l, err := net.Listen("tcp", serverAddress) + testutil.Ok(t, err) + g.Add(func() error { + return errors.Wrap(http.Serve(l, mux), "serve probes") + }, func(error) {}) + + p := NewProbeInRouter(component, router, log.NewNopLogger()) + mux.Handle("/", router) + + go func() { _ = g.Run() }() + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, healthyEndpointPath)) + testutil.Ok(t, err) + testutil.Equals(t, probeErrorHTTPStatus, resp.StatusCode) + return err + })) + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, readyEndpointPath)) + testutil.Ok(t, err) + testutil.Equals(t, probeErrorHTTPStatus, resp.StatusCode) + return err + })) + + p.SetHealthy() + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, healthyEndpointPath)) + testutil.Ok(t, err) + testutil.Equals(t, 200, resp.StatusCode) + return err + })) + +} diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index cbcc8e0189f..9296636b70e 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -174,13 +174,13 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return nil } -func(p *PrometheusStore) chunkSamples(series prompb.TimeSeries, samplesPerChunk int) ([]storepb.AggrChunk, error) { +func (p *PrometheusStore) chunkSamples(series prompb.TimeSeries, samplesPerChunk int) ([]storepb.AggrChunk, error) { var aggregatedChunks []storepb.AggrChunk samples := series.Samples for len(samples) > 0 { chunkSize := len(samples) - if chunkSize > samplesPerChunk { + if chunkSize > samplesPerChunk { chunkSize = samplesPerChunk }