Skip to content

Commit

Permalink
promclient: drop version check; use flags 404 to determine if Prometh…
Browse files Browse the repository at this point in the history
…eus supports it (#1163)

* promclient: drop version check

The version check would check the prometheus version and if > 2.2.0
would query the flags endpoint to do some extra validation.

Remove this for just checking the flags endpoints and if it responds do
the validation; otherwise log the failure, but continue the startup.

Signed-off-by: Miek Gieben <[email protected]>

* address nits from review

Signed-off-by: Miek Gieben <[email protected]>
  • Loading branch information
miekg authored and bwplotka committed May 27, 2019
1 parent 4eb764e commit 43102df
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 124 deletions.
52 changes: 21 additions & 31 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
version "github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/component"
Expand Down Expand Up @@ -140,21 +139,6 @@ func runSidecar(
g.Add(func() error {
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Retry infinitely until we get Prometheus version.
if err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if m.version, err = promclient.PromVersion(logger, m.promURL); err != nil {
level.Warn(logger).Log(
"msg", "failed to get Prometheus version. Is Prometheus running? Retrying",
"err", err,
)
return errors.Wrapf(err, "fetch Prometheus version")
}

return nil
}); err != nil {
return errors.Wrap(err, "fetch Prometheus version")
}

// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
Expand Down Expand Up @@ -321,20 +305,27 @@ func runSidecar(
}

func validatePrometheus(ctx context.Context, logger log.Logger, m *promMetadata) error {
if m.version == nil {
level.Warn(logger).Log("msg", "fetched version is nil or invalid. Unable to know whether Prometheus supports /version endpoint, skip validation")
return nil
flags := promclient.Flags{
TSDBMinTime: model.Duration(2 * time.Hour),
TSDBMaxTime: model.Duration(2 * time.Hour),
}

if m.version.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint, skip validation", "version", m.version.Original())
if err := runutil.Retry(2*time.Second, ctx.Done(), func() error {

var err error
if flags, err = promclient.ConfiguredFlags(ctx, logger, m.promURL); err != nil {
if err == promclient.ErrFlagEndpointNotFound { // saw 404
level.Warn(logger).Log("msg", "failed to check Promteheus flags endpoint. No extra validation is done: %s", err)
return nil
}
level.Warn(logger).Log("msg", "failed to get Prometheus flags. Is Prometheus running? Retrying", "err", err)
return errors.Wrapf(err, "fetch Prometheus flags")
}

return nil
}

flags, err := promclient.ConfiguredFlags(ctx, logger, m.promURL)
if err != nil {
return errors.Wrap(err, "failed to check flags")
}); err != nil {
return errors.Wrapf(err, "fetch Prometheus flags")
}

// Check if compaction is disabled.
Expand All @@ -354,11 +345,10 @@ func validatePrometheus(ctx context.Context, logger log.Logger, m *promMetadata)
type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
version *version.Version
mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
Expand Down
75 changes: 16 additions & 59 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
version "github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/tracing"
Expand All @@ -36,7 +35,7 @@ import (
yaml "gopkg.in/yaml.v2"
)

var FlagsVersion = version.Must(version.NewVersion("2.2.0"))
var ErrFlagEndpointNotFound = errors.New("no flag endpoint found")

// IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell
// if we have access to Prometheus TSDB directory.
Expand Down Expand Up @@ -181,6 +180,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla
if err != nil {
return Flags{}, errors.Wrap(err, "create request")
}

resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return Flags{}, errors.Wrapf(err, "request config against %s", u.String())
Expand All @@ -192,18 +192,23 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla
return Flags{}, errors.New("failed to read body")
}

if resp.StatusCode != 200 {
return Flags{}, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}
switch resp.StatusCode {
case 404:
return Flags{}, ErrFlagEndpointNotFound
case 200:
var d struct {
Data Flags `json:"data"`
}

var d struct {
Data Flags `json:"data"`
}
if err := json.Unmarshal(b, &d); err != nil {
return Flags{}, errors.Wrapf(err, "unmarshal response: %v", string(b))
if err := json.Unmarshal(b, &d); err != nil {
return Flags{}, errors.Wrapf(err, "unmarshal response: %v", string(b))
}

return d.Data, nil
default:
return Flags{}, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

return d.Data, nil
}

// Snapshot will request Prometheus to perform snapshot in directory returned by this function.
Expand Down Expand Up @@ -393,32 +398,6 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q
return vec, warnings, nil
}

// PromVersion will return the version of Prometheus by querying /version Prometheus endpoint.
func PromVersion(logger log.Logger, base *url.URL) (*version.Version, error) {
if logger == nil {
logger = log.NewNopLogger()
}

u := *base
u.Path = path.Join(u.Path, "/version")
resp, err := http.Get(u.String())
if err != nil {
return nil, errors.Wrapf(err, "request version against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.New("failed to read body")
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

return parseVersion(b)
}

// Scalar response consists of array with mixed types so it needs to be
// unmarshaled separately.
func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) {
Expand Down Expand Up @@ -521,25 +500,3 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr
}
}
}

// parseVersion converts string to version.Version.
func parseVersion(data []byte) (*version.Version, error) {
var m struct {
Version string `json:"version"`
}
if err := json.Unmarshal(data, &m); err != nil {
return nil, errors.Wrapf(err, "unmarshal response: %v", string(data))
}

// Prometheus is built with nil version.
if strings.TrimSpace(m.Version) == "" {
return nil, nil
}

ver, err := version.NewVersion(m.Version)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse version %s", m.Version)
}

return ver, nil
}
34 changes: 0 additions & 34 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/go-kit/kit/log"
version "github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -148,36 +147,3 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) {
vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult)
testutil.NotOk(t, err)
}

func TestParseVersion(t *testing.T) {
promVersions := map[string]string{
"": promVersionResp(""),
"2.2.0": promVersionResp("2.2.0"),
"2.3.0": promVersionResp("2.3.0"),
"2.3.0-rc.0": promVersionResp("2.3.0-rc.0"),
}

promMalformedVersions := map[string]string{
"foo": promVersionResp("foo"),
"bar": promVersionResp("bar"),
}

for v, resp := range promVersions {
gotVersion, err := parseVersion([]byte(resp))
testutil.Ok(t, err)
expectVersion, _ := version.NewVersion(v)
testutil.Equals(t, gotVersion, expectVersion)
}

for v, resp := range promMalformedVersions {
gotVersion, err := parseVersion([]byte(resp))
testutil.NotOk(t, err)
expectVersion, _ := version.NewVersion(v)
testutil.Equals(t, gotVersion, expectVersion)
}
}

// promVersionResp returns the response of Prometheus /version endpoint.
func promVersionResp(ver string) string {
return fmt.Sprintf(`{"version":"%s","revision":"","branch":"","buildUser":"","buildDate":"","goVersion":""}`, ver)
}

0 comments on commit 43102df

Please sign in to comment.