Skip to content

Commit

Permalink
add function to get prometheus version and check if compaction is dia…
Browse files Browse the repository at this point in the history
…bled.
  • Loading branch information
yeya24 committed Feb 21, 2019
1 parent 8622888 commit b0ce5e1
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 20 deletions.
11 changes: 10 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,25 @@ func runSidecar(

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Get Prometheus version.
ver, err := promclient.GetPrometheusVersion(ctx, logger, promURL)
if err != nil {
return errors.Wrapf(err, "get Prometheus version")
}
// Check if Prometheus has /status/flags endpoint.
if ver.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint. Skip validation", "version", ver.Original())
} else {
// Check if Prometheus disables compaction.
if err := promclient.ValidateCompaction(ctx, logger, promURL); err != nil {
return errors.Wrap(err, "validate Prometheus compaction")
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
err = runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx, logger); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -294,6 +310,7 @@ func runSidecar(
return nil
}


type promMetadata struct {
promURL *url.URL

Expand Down
64 changes: 64 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/hashicorp/go-version"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -28,6 +29,13 @@ import (
"gopkg.in/yaml.v2"
)

// flagsVersion is the version that Prometheus started to have /status/flags endpoint
var FlagsVersion *version.Version

func init() {
FlagsVersion, _ = version.NewVersion("2.2.0")
}

// IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell
// if we have access to Prometheus TSDB directory.
func IsWALDirAccesible(dir string) error {
Expand Down Expand Up @@ -335,6 +343,62 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q
return vec, nil
}

// GetPrometheusVersion returns the version of Prometheus.
func GetPrometheusVersion(ctx context.Context, logger log.Logger, base *url.URL) (*version.Version, error) {
if logger == nil {
logger = log.NewNopLogger()
}

u := *base
u.Path = path.Join(u.Path, "/version")
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "create request")
}

resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return nil, errors.Wrapf(err, "request version against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Errorf("failed to read body")
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}
var m struct {
Version string `json:"version"`
}
if err := json.Unmarshal(b, &m); err != nil {
return nil, errors.Wrapf(err, "unmarshal response: %v", string(b))
}
ver, err := version.NewVersion(m.Version)
if err != nil {
return nil, errors.Wrapf(err, "failed to init version %s", m.Version)
}

return ver, nil
}

// ValidateCompaction ensures that Prometheus disables compaction.
func ValidateCompaction(ctx context.Context, logger log.Logger, promURL *url.URL) error {
flags, err := ConfiguredFlags(ctx, logger, promURL)
if err != nil {
return errors.Wrap(err, "configured flags; failed to check if compaction is disabled")
}
// Check if min-block-time and max-block-time are equal to 2h.
if flags.TSDBMinTime != model.Duration(2*time.Hour) || flags.TSDBMaxTime != model.Duration(2*time.Hour) {
return errors.Errorf("Found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration = 2h)", flags.TSDBMaxTime, flags.TSDBMinTime)
}

return nil
}

// Scalar response consists of array with mixed types so it needs to be
// unmarshaled separately.
func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) {
Expand Down
38 changes: 20 additions & 18 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ package shipper
import (
"context"
"encoding/json"
"io/ioutil"
"math"
"net/url"
"os"
"path"
"path/filepath"
"sort"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
Expand All @@ -24,10 +15,16 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"io/ioutil"
"math"
"net/url"
"os"
"path"
"path/filepath"
"sort"
)

type metrics struct {
Expand Down Expand Up @@ -135,15 +132,20 @@ func NewWithCompacted(
if lbls == nil {
lbls = func() labels.Labels { return nil }
}

flags, err := promclient.ConfiguredFlags(ctx, logger, prometheusURL)
// Get Prometheus version.
ver, err := promclient.GetPrometheusVersion(ctx, logger, prometheusURL)
if err != nil {
return nil, errors.Wrap(err, "configured flags; failed to check if compaction is disabled")
}

if flags.TSDBMinTime != model.Duration(2*time.Hour) || flags.TSDBMaxTime != model.Duration(2*time.Hour) {
return nil, errors.Errorf("Found that TSDB Max time is %s and Min time is %s. To use shipper with upload compacted option, "+
"compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration = 2h", flags.TSDBMinTime, flags.TSDBMaxTime)
return nil, errors.Wrapf(err, "get Prometheus version")
}
// Check if Prometheus has /status/flags endpoint.
if ver.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint. Skip validation", "version", ver.Original())
} else {
// Check if Prometheus disables compaction.
if err := promclient.ValidateCompaction(ctx, logger, prometheusURL); err != nil {
return nil, errors.Wrap(err, "validate Prometheus compaction")
}
}

return &Shipper{
Expand Down

0 comments on commit b0ce5e1

Please sign in to comment.