Skip to content

Commit

Permalink
promclient: Added methods for Prometheus snapshot and flags.
Browse files Browse the repository at this point in the history
Also:
* Resigned from tests against 2.0.0 version as in README.
* Added foreach Prometheus version testutil.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Feb 1, 2019
1 parent 85b4394 commit e15f983
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 71 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- S3 provider:
- Added `put_user_metadata` option to config.
- Added `insecure_skip_verify` option to config.


### Deprecated

- Tests against Prometheus below v2.2.1. This does not mean *lack* of support for those. Only that we don't tests the compatibility anymore. See [#758](https://github.com/improbable-eng/thanos/issues/758) for details.

## [v0.2.1](https://github.com/improbable-eng/thanos/releases/tag/v0.2.1) - 2018.12.27

### Added
Expand Down
8 changes: 3 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ PROTOC_VERSION ?= 3.4.0
# Referenced by github.com/improbable-eng/thanos/blob/master/docs/getting_started.md#prometheus

# Limitied prom version, because testing was not possibe. This should fix it: https://github.com/improbable-eng/thanos/issues/758
SUPPORTED_PROM_VERSIONS ?=v2.4.3 v2.5.0
PROM_VERSIONS ?=v2.4.3 v2.5.0
ALERTMANAGER_VERSION ?=v0.15.2
MINIO_SERVER_VERSION ?=RELEASE.2018-10-06T00-15-16Z

Expand Down Expand Up @@ -157,16 +157,14 @@ tarballs-release: $(PROMU)
.PHONY: test
test: test-deps
@echo ">> running all tests. Do export THANOS_SKIP_GCS_TESTS='true' or/and THANOS_SKIP_S3_AWS_TESTS='true' or/and THANOS_SKIP_AZURE_TESTS='true' and/or THANOS_SKIP_SWIFT_TESTS='true' and/or THANOS_SKIP_TENCENT_COS_TESTS='true' if you want to skip e2e tests against real store buckets"
@for ver in $(SUPPORTED_PROM_VERSIONS); do \
THANOS_TEST_PROMETHEUS_PATH="prometheus-$$ver" THANOS_TEST_ALERTMANAGER_PATH="alertmanager-$(ALERTMANAGER_VERSION)" go test $(shell go list ./... | grep -v /vendor/ | grep -v /benchmark/); \
done
THANOS_TEST_PROMETHEUS_VERSIONS="$(PROM_VERSIONS)" THANOS_TEST_ALERTMANAGER_PATH="alertmanager-$(ALERTMANAGER_VERSION)" go test $(shell go list ./... | grep -v /vendor/ | grep -v /benchmark/);

# test-deps installs dependency for e2e tets.
# It installs current Thanos, supported versions of Prometheus and alertmanager to test against in e2e.
.PHONY: test-deps
test-deps: deps
@go install github.com/improbable-eng/thanos/cmd/thanos
$(foreach ver,$(SUPPORTED_PROM_VERSIONS),$(call fetch_go_bin_version,github.com/prometheus/prometheus/cmd/prometheus,$(ver)))
$(foreach ver,$(PROM_VERSIONS),$(call fetch_go_bin_version,github.com/prometheus/prometheus/cmd/prometheus,$(ver)))
$(call fetch_go_bin_version,github.com/prometheus/alertmanager/cmd/alertmanager,$(ALERTMANAGER_VERSION))
$(call fetch_go_bin_version,github.com/minio/minio,$(MINIO_SERVER_VERSION))

Expand Down
164 changes: 159 additions & 5 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ import (
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/tsdb/labels"
"gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe
}
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return nil, errors.Wrapf(err, "request config against %s", u.String())
return nil, errors.Wrapf(err, "request flags against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

Expand Down Expand Up @@ -88,6 +89,159 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe
return labels.FromMap(cfg.Global.ExternalLabels), nil
}

type Flags struct {
TSDBPath string `json:"storage.tsdb.path"`
TSDBRetention model.Duration `json:"storage.tsdb.retention"`
TSDBMinTime model.Duration `json:"storage.tsdb.min-block-duration"`
TSDBMaxTime model.Duration `json:"storage.tsdb.max-block-duration"`
WebEnableAdminAPI bool `json:"web.enable-admin-api"`
WebEnableLifecycle bool `json:"web.enable-lifecycle"`
}

// UnmarshalJSON implements the json.Unmarshaler interface.
func (f *Flags) UnmarshalJSON(b []byte) error {
// TODO(bwplotka): Avoid this custom unmarshal by:
// - prometheus/common: adding unmarshalJSON to modelDuration
// - prometheus/prometheus: flags should return proper JSON. (not bool in string)
parsableFlags := struct {
TSDBPath string `json:"storage.tsdb.path"`
TSDBRetention modelDuration `json:"storage.tsdb.retention"`
TSDBMinTime modelDuration `json:"storage.tsdb.min-block-duration"`
TSDBMaxTime modelDuration `json:"storage.tsdb.max-block-duration"`
WebEnableAdminAPI modelBool `json:"web.enable-admin-api"`
WebEnableLifecycle modelBool `json:"web.enable-lifecycle"`
}{}

if err := json.Unmarshal(b, &parsableFlags); err != nil {
return err
}

*f = Flags{
TSDBPath: parsableFlags.TSDBPath,
TSDBRetention: model.Duration(parsableFlags.TSDBRetention),
TSDBMinTime: model.Duration(parsableFlags.TSDBMinTime),
TSDBMaxTime: model.Duration(parsableFlags.TSDBMaxTime),
WebEnableAdminAPI: bool(parsableFlags.WebEnableAdminAPI),
WebEnableLifecycle: bool(parsableFlags.WebEnableLifecycle),
}
return nil
}

type modelDuration model.Duration

// UnmarshalJSON implements the json.Unmarshaler interface.
func (d *modelDuration) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}

dur, err := model.ParseDuration(s)
if err != nil {
return err
}
*d = modelDuration(dur)
return nil
}

type modelBool bool

// UnmarshalJSON implements the json.Unmarshaler interface.
func (m *modelBool) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}

boolean, err := strconv.ParseBool(s)
if err != nil {
return err
}
*m = modelBool(boolean)
return nil
}

// ConfiguredFlags some configured flags from /api/v1/status/flags Prometheus endpoint.
// Added to Prometheus from v2.2.
func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Flags, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/status/flags")

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return Flags{}, errors.Wrap(err, "create request")
}
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return Flags{}, errors.Wrapf(err, "request config against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return Flags{}, errors.Errorf("failed to read body")
}

if resp.StatusCode != 200 {
return Flags{}, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

var d struct {
Data Flags `json:"data"`
}
if err := json.Unmarshal(b, &d); err != nil {
return Flags{}, errors.Wrapf(err, "unmarshal response: %v", string(b))
}

return d.Data, nil
}

// Snapshot will request Prometheus to perform snapshot in directory returned by this function.
// Returned directory is relative to Prometheus data-dir.
// NOTE: `--web.enable-admin-api` flag has to be set on Prometheus.
// Added to Prometheus from v2.1.
// TODO(bwplotka): Add metrics.
func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bool) (string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/admin/tsdb/snapshot")

req, err := http.NewRequest(
http.MethodPost,
u.String(),
strings.NewReader(url.Values{"skip_head": []string{strconv.FormatBool(skipHead)}}.Encode()),
)
if err != nil {
return "", errors.Wrap(err, "create request")
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return "", errors.Wrapf(err, "request snapshot against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", errors.Errorf("failed to read body")
}

if resp.StatusCode != 200 {
return "", errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

var d struct {
Data struct {
Name string `json:"name"`
} `json:"data"`
}
if err := json.Unmarshal(b, &d); err != nil {
return "", errors.Wrapf(err, "unmarshal response: %v", string(b))
}

return path.Join("snapshots", d.Data.Name), nil
}

// QueryInstant performs instant query and returns results in model.Vector type.
func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, dedup bool) (model.Vector, error) {
if logger == nil {
Expand Down
115 changes: 88 additions & 27 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,118 @@ package promclient
import (
"context"
"fmt"
"io/ioutil"
"net/url"
"os"
"path"
"testing"
"time"

"github.com/prometheus/common/model"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
)

func TestIsWALFileAccesible_e2e(t *testing.T) {
p, err := testutil.NewPrometheus()
testutil.Ok(t, err)
testutil.ForeachPrometheus(t, func(t testing.TB, p *testutil.Prometheus) {
testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { return IsWALDirAccesible(p.Dir()) }))

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { return IsWALDirAccesible(p.Dir()) }))

testutil.NotOk(t, IsWALDirAccesible(path.Join(p.Dir(), "/non-existing")))
testutil.NotOk(t, IsWALDirAccesible(path.Join(p.Dir(), "/../")))
testutil.NotOk(t, IsWALDirAccesible(path.Join(p.Dir(), "/non-existing")))
testutil.NotOk(t, IsWALDirAccesible(path.Join(p.Dir(), "/../")))
})
}

func TestExternalLabels_e2e(t *testing.T) {
p, err := testutil.NewPrometheus()
testutil.Ok(t, err)

err = p.SetConfig(`
testutil.ForeachPrometheus(t, func(t testing.TB, p *testutil.Prometheus) {
testutil.Ok(t, p.SetConfig(`
global:
external_labels:
region: eu-west
az: 1
`)
testutil.Ok(t, err)
`))

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()
testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

ext, err := ExternalLabels(context.Background(), log.NewNopLogger(), u)
testutil.Ok(t, err)
ext, err := ExternalLabels(context.Background(), log.NewNopLogger(), u)
testutil.Ok(t, err)

testutil.Equals(t, 2, len(ext))
testutil.Equals(t, "eu-west", ext.Get("region"))
testutil.Equals(t, "1", ext.Get("az"))
})
}

func TestConfiguredFlags_e2e(t *testing.T) {
testutil.ForeachPrometheus(t, func(t testing.TB, p *testutil.Prometheus) {
testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

flags, err := ConfiguredFlags(context.Background(), log.NewNopLogger(), u)
testutil.Ok(t, err)

testutil.Assert(t, flags.WebEnableAdminAPI, "")
testutil.Assert(t, !flags.WebEnableLifecycle, "")
testutil.Equals(t, p.Dir(), flags.TSDBPath)
testutil.Equals(t, int64(2*time.Hour), int64(flags.TSDBMinTime))
testutil.Equals(t, int64(36*time.Hour), int64(flags.TSDBMaxTime))
testutil.Equals(t, int64(15*24*time.Hour), int64(flags.TSDBRetention))
})
}

func TestSnapshot_e2e(t *testing.T) {
testutil.ForeachPrometheus(t, func(t testing.TB, p *testutil.Prometheus) {
now := time.Now()

// Create artificial block.
id, err := testutil.CreateBlockWithTombstone(
p.Dir(),
[]labels.Labels{labels.FromStrings("a", "b")},
10,
timestamp.FromTime(now.Add(-6*time.Hour)),
timestamp.FromTime(now.Add(-4*time.Hour)),
nil,
0,
)
testutil.Ok(t, err)

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

dir, err := Snapshot(context.Background(), log.NewNopLogger(), u, false)
testutil.Ok(t, err)

_, err = os.Stat(path.Join(p.Dir(), dir, id.String()))
testutil.Ok(t, err)

files, err := ioutil.ReadDir(path.Join(p.Dir(), dir))
testutil.Ok(t, err)

for _, f := range files {
_, err := ulid.Parse(f.Name())
testutil.Ok(t, err)
}

testutil.Equals(t, 2, len(ext))
testutil.Equals(t, "eu-west", ext.Get("region"))
testutil.Equals(t, "1", ext.Get("az"))
testutil.Equals(t, 2, len(files))
})
}

func TestRule_UnmarshalScalarResponse(t *testing.T) {
Expand Down
Loading

0 comments on commit e15f983

Please sign in to comment.