From 51da5d928ec6edc7f767e28286461de21d808566 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Fri, 11 Jan 2019 19:27:23 +0000 Subject: [PATCH] Moved client prometheus code to separate promclient package. Nothing changed, except fixed IsWALDirAccessible (as WAL is actually a dir). Otherwise just moved code. Signed-off-by: Bartek Plotka --- cmd/thanos/rule.go | 120 +--------------- cmd/thanos/rule_test.go | 31 +--- cmd/thanos/sidecar.go | 67 +-------- cmd/thanos/sidecar_test.go | 38 ----- pkg/extprom/extprom.go | 2 + pkg/promclient/promclient.go | 200 ++++++++++++++++++++++++++ pkg/promclient/promclient_e2e_test.go | 83 +++++++++++ pkg/testutil/prometheus.go | 42 +++++- 8 files changed, 330 insertions(+), 253 deletions(-) delete mode 100644 cmd/thanos/sidecar_test.go create mode 100644 pkg/promclient/promclient.go create mode 100644 pkg/promclient/promclient_e2e_test.go diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index d5a22e45ef..09c5c28663 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -2,8 +2,6 @@ package main import ( "context" - "encoding/json" - "fmt" "math" "math/rand" "net" @@ -19,8 +17,6 @@ import ( "syscall" "time" - "github.com/improbable-eng/thanos/pkg/extprom" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" @@ -28,7 +24,9 @@ import ( "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/discovery/cache" "github.com/improbable-eng/thanos/pkg/discovery/dns" + "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore/client" + "github.com/improbable-eng/thanos/pkg/promclient" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/shipper" "github.com/improbable-eng/thanos/pkg/store" @@ -39,7 +37,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/discovery/file" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -271,7 +268,7 @@ func runRule( removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs) for _, i := range rand.Perm(len(addrs)) { - vec, err := queryPrometheusInstant(ctx, logger, addrs[i], q, t) + vec, err := promclient.QueryInstant(ctx, logger, addrs[i], q, t) if err != nil { return nil, err } @@ -606,117 +603,6 @@ func runRule( return nil } -// Scalar response consists of array with mixed types so it needs to be -// unmarshaled separatelly. -func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) { - var ( - // Do not specify exact length of the expected slice since JSON unmarshaling - // would make the leght fit the size and we won't be able to check the length afterwards. - resultPointSlice []json.RawMessage - resultTime model.Time - resultValue model.SampleValue - ) - if err := json.Unmarshal(scalarJSONResult, &resultPointSlice); err != nil { - return nil, err - } - if len(resultPointSlice) != 2 { - return nil, errors.Errorf("invalid scalar result format %v, expected timestamp -> value tuple", resultPointSlice) - } - if err := json.Unmarshal(resultPointSlice[0], &resultTime); err != nil { - return nil, errors.Wrapf(err, "unmarshaling scalar time from %v", resultPointSlice) - } - if err := json.Unmarshal(resultPointSlice[1], &resultValue); err != nil { - return nil, errors.Wrapf(err, "unmarshaling scalar value from %v", resultPointSlice) - } - return model.Vector{&model.Sample{ - Metric: model.Metric{}, - Value: resultValue, - Timestamp: resultTime}}, nil -} - -func queryPrometheusInstant(ctx context.Context, logger log.Logger, addr, query string, t time.Time) (promql.Vector, error) { - u, err := url.Parse(fmt.Sprintf("http://%s/api/v1/query", addr)) - if err != nil { - return nil, err - } - params := url.Values{} - params.Add("query", query) - params.Add("time", t.Format(time.RFC3339Nano)) - params.Add("dedup", "true") - u.RawQuery = params.Encode() - - req, err := http.NewRequest("GET", u.String(), nil) - if err != nil { - return nil, err - } - - span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]") - defer span.Finish() - - req = req.WithContext(ctx) - - client := &http.Client{ - Transport: tracing.HTTPTripperware(logger, http.DefaultTransport), - } - resp, err := client.Do(req) - if err != nil { - return nil, err - } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") - - // Decode only ResultType and load Result only as RawJson since we don't know - // structure of the Result yet. - var m struct { - Data struct { - ResultType string `json:"resultType"` - Result json.RawMessage `json:"result"` - } `json:"data"` - } - - if err = json.NewDecoder(resp.Body).Decode(&m); err != nil { - return nil, err - } - - var vectorResult model.Vector - - // Decode the Result depending on the ResultType - // Currently only `vector` and `scalar` types are supported - switch m.Data.ResultType { - case promql.ValueTypeVector: - if err = json.Unmarshal(m.Data.Result, &vectorResult); err != nil { - return nil, err - } - case promql.ValueTypeScalar: - vectorResult, err = convertScalarJSONToVector(m.Data.Result) - if err != nil { - return nil, err - } - default: - return nil, errors.Errorf("unknown response type: '%q'", m.Data.ResultType) - } - - vec := make(promql.Vector, 0, len(vectorResult)) - - for _, e := range vectorResult { - lset := make(promlabels.Labels, 0, len(e.Metric)) - - for k, v := range e.Metric { - lset = append(lset, promlabels.Label{ - Name: string(k), - Value: string(v), - }) - } - sort.Sort(lset) - - vec = append(vec, promql.Sample{ - Metric: lset, - Point: promql.Point{T: int64(e.Timestamp), V: float64(e.Value)}, - }) - } - - return vec, nil -} - type alertmanagerSet struct { resolver dns.Resolver addrs []string diff --git a/cmd/thanos/rule_test.go b/cmd/thanos/rule_test.go index 9020f4614a..7f37f07596 100644 --- a/cmd/thanos/rule_test.go +++ b/cmd/thanos/rule_test.go @@ -8,39 +8,12 @@ import ( "github.com/improbable-eng/thanos/pkg/discovery/dns" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/pkg/errors" - "github.com/prometheus/common/model" ) -func TestRule_UnmarshalScalarResponse(t *testing.T) { - var ( - scalarJSONResult = []byte(`[1541196373.677,"1"]`) - invalidLengthScalarJSONResult = []byte(`[1541196373.677,"1", "nonsence"]`) - invalidDataScalarJSONResult = []byte(`["foo","bar"]`) - - vectorResult model.Vector - expectedVector = model.Vector{&model.Sample{ - Metric: model.Metric{}, - Value: 1, - Timestamp: model.Time(1541196373677)}} - ) - // Test valid input. - vectorResult, err := convertScalarJSONToVector(scalarJSONResult) - testutil.Ok(t, err) - testutil.Equals(t, vectorResult.String(), expectedVector.String()) - - // Test invalid length of scalar data structure. - vectorResult, err = convertScalarJSONToVector(invalidLengthScalarJSONResult) - testutil.NotOk(t, err) - - // Test invalid format of scalar data. - vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult) - testutil.NotOk(t, err) -} - func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) { mockResolver := mockResolver{ resultIPs: map[string][]string{ - "alertmanager.com:9093": []string{"1.1.1.1:9300"}, + "alertmanager.com:9093": {"1.1.1.1:9300"}, }, } am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}} @@ -62,7 +35,7 @@ func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) { func TestRule_AlertmanagerResolveWithPort(t *testing.T) { mockResolver := mockResolver{ resultIPs: map[string][]string{ - "alertmanager.com:19093": []string{"1.1.1.1:9300"}, + "alertmanager.com:19093": {"1.1.1.1:9300"}, }, } am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}} diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 3d24c31db5..9e48be809f 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -2,15 +2,10 @@ package main import ( "context" - "encoding/json" - "io/ioutil" "math" "net" "net/http" "net/url" - "os" - "path" - "path/filepath" "sync" "time" @@ -19,6 +14,7 @@ import ( "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" + "github.com/improbable-eng/thanos/pkg/promclient" "github.com/improbable-eng/thanos/pkg/reloader" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/shipper" @@ -31,7 +27,6 @@ import ( "github.com/prometheus/tsdb/labels" "google.golang.org/grpc" "gopkg.in/alecthomas/kingpin.v2" - "gopkg.in/yaml.v2" ) func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) { @@ -253,7 +248,7 @@ func runSidecar( } }() - if err := isPrometheusDirAccesible(dataDir); err != nil { + if err := promclient.IsWALDirAccesible(dataDir); err != nil { level.Error(logger).Log("err", err) } @@ -286,20 +281,6 @@ func runSidecar( return nil } -func isPrometheusDirAccesible(dir string) error { - const errMsg = "WAL file is not accessible. Does block shipper dir is a TSDB directory? If yes it is shared with TSDB?" - f, err := os.Stat(filepath.Join(dir, "WAL")) - if err != nil { - return errors.Wrap(err, errMsg) - } - - if f.IsDir() { - return errors.New(errMsg) - } - - return nil -} - type metadata struct { promURL *url.URL @@ -310,7 +291,7 @@ type metadata struct { } func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error { - elset, err := queryExternalLabels(ctx, logger, s.promURL) + elset, err := promclient.ExternalLabels(ctx, logger, s.promURL) if err != nil { return err } @@ -357,45 +338,3 @@ func (s *metadata) Timestamps() (mint int64, maxt int64) { return s.mint, s.maxt } - -func queryExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labels.Labels, error) { - u := *base - u.Path = path.Join(u.Path, "/api/v1/status/config") - - req, err := http.NewRequest("GET", u.String(), nil) - if err != nil { - return nil, errors.Wrap(err, "create request") - } - resp, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return nil, errors.Wrapf(err, "request config against %s", u.String()) - } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") - - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, errors.Errorf("failed to read body") - } - - if resp.StatusCode != 200 { - return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b)) - } - - var d struct { - Data struct { - YAML string `json:"yaml"` - } `json:"data"` - } - if err := json.Unmarshal(b, &d); err != nil { - return nil, errors.Wrapf(err, "unmarshal response: %v", string(b)) - } - var cfg struct { - Global struct { - ExternalLabels map[string]string `yaml:"external_labels"` - } `yaml:"global"` - } - if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil { - return nil, errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML) - } - return labels.FromMap(cfg.Global.ExternalLabels), nil -} diff --git a/cmd/thanos/sidecar_test.go b/cmd/thanos/sidecar_test.go deleted file mode 100644 index 991a95047c..0000000000 --- a/cmd/thanos/sidecar_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "context" - "net/url" - "testing" - - "fmt" - - "github.com/go-kit/kit/log" - "github.com/improbable-eng/thanos/pkg/testutil" -) - -func TestSidecar_queryExternalLabels(t *testing.T) { - p, err := testutil.NewPrometheus() - testutil.Ok(t, err) - - err = p.SetConfig(` -global: - external_labels: - region: eu-west - az: 1 -`) - testutil.Ok(t, err) - - testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() - - u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) - testutil.Ok(t, err) - - ext, err := queryExternalLabels(context.Background(), log.NewNopLogger(), u) - testutil.Ok(t, err) - - testutil.Equals(t, 2, len(ext)) - testutil.Equals(t, "eu-west", ext.Get("region")) - testutil.Equals(t, "1", ext.Get("az")) -} diff --git a/pkg/extprom/extprom.go b/pkg/extprom/extprom.go index 5dc14e777a..4489119961 100644 --- a/pkg/extprom/extprom.go +++ b/pkg/extprom/extprom.go @@ -1,3 +1,5 @@ +// Package extprom is covering code that is used for extending native Prometheus packages functionality. + package extprom import ( diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go new file mode 100644 index 0000000000..8873788db1 --- /dev/null +++ b/pkg/promclient/promclient.go @@ -0,0 +1,200 @@ +// Package promclient offers helper client function for various API endpoints. + +package promclient + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "sort" + "time" + + "github.com/improbable-eng/thanos/pkg/tracing" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/pkg/errors" + promlabels "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/tsdb/labels" + "gopkg.in/yaml.v2" +) + +// IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell +// if we have access to Prometheus TSDB directory. +func IsWALDirAccesible(dir string) error { + const errMsg = "WAL dir is not accessible. Is this dir a TSDB directory? If yes it is shared with TSDB?" + + f, err := os.Stat(filepath.Join(dir, "wal")) + if err != nil { + return errors.Wrap(err, errMsg) + } + + if !f.IsDir() { + return errors.New(errMsg) + } + + return nil +} + +// ExternalLabels returns external labels from /api/v1/status/config Prometheus endpoint. +// Note that configuration can be hot reloadable on Prometheus, so this config might change in runtime. +func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labels.Labels, error) { + u := *base + u.Path = path.Join(u.Path, "/api/v1/status/config") + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return nil, errors.Wrap(err, "create request") + } + resp, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, errors.Wrapf(err, "request config against %s", u.String()) + } + defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Errorf("failed to read body") + } + + if resp.StatusCode != 200 { + return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b)) + } + + var d struct { + Data struct { + YAML string `json:"yaml"` + } `json:"data"` + } + if err := json.Unmarshal(b, &d); err != nil { + return nil, errors.Wrapf(err, "unmarshal response: %v", string(b)) + } + var cfg struct { + Global struct { + ExternalLabels map[string]string `yaml:"external_labels"` + } `yaml:"global"` + } + if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil { + return nil, errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML) + } + return labels.FromMap(cfg.Global.ExternalLabels), nil +} + +func QueryInstant(ctx context.Context, logger log.Logger, addr, query string, t time.Time) (promql.Vector, error) { + u, err := url.Parse(fmt.Sprintf("http://%s/api/v1/query", addr)) + if err != nil { + return nil, err + } + params := url.Values{} + params.Add("query", query) + params.Add("time", t.Format(time.RFC3339Nano)) + params.Add("dedup", "true") + u.RawQuery = params.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + + span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]") + defer span.Finish() + + req = req.WithContext(ctx) + + client := &http.Client{ + Transport: tracing.HTTPTripperware(logger, http.DefaultTransport), + } + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + + // Decode only ResultType and load Result only as RawJson since we don't know + // structure of the Result yet. + var m struct { + Data struct { + ResultType string `json:"resultType"` + Result json.RawMessage `json:"result"` + } `json:"data"` + } + + if err = json.NewDecoder(resp.Body).Decode(&m); err != nil { + return nil, err + } + + var vectorResult model.Vector + + // Decode the Result depending on the ResultType + // Currently only `vector` and `scalar` types are supported + switch m.Data.ResultType { + case promql.ValueTypeVector: + if err = json.Unmarshal(m.Data.Result, &vectorResult); err != nil { + return nil, err + } + case promql.ValueTypeScalar: + vectorResult, err = convertScalarJSONToVector(m.Data.Result) + if err != nil { + return nil, err + } + default: + return nil, errors.Errorf("unknown response type: '%q'", m.Data.ResultType) + } + + vec := make(promql.Vector, 0, len(vectorResult)) + + for _, e := range vectorResult { + lset := make(promlabels.Labels, 0, len(e.Metric)) + + for k, v := range e.Metric { + lset = append(lset, promlabels.Label{ + Name: string(k), + Value: string(v), + }) + } + sort.Sort(lset) + + vec = append(vec, promql.Sample{ + Metric: lset, + Point: promql.Point{T: int64(e.Timestamp), V: float64(e.Value)}, + }) + } + + return vec, nil +} + +// Scalar response consists of array with mixed types so it needs to be +// unmarshaled separately. +func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) { + var ( + // Do not specify exact length of the expected slice since JSON unmarshaling + // would make the leght fit the size and we won't be able to check the length afterwards. + resultPointSlice []json.RawMessage + resultTime model.Time + resultValue model.SampleValue + ) + if err := json.Unmarshal(scalarJSONResult, &resultPointSlice); err != nil { + return nil, err + } + if len(resultPointSlice) != 2 { + return nil, errors.Errorf("invalid scalar result format %v, expected timestamp -> value tuple", resultPointSlice) + } + if err := json.Unmarshal(resultPointSlice[0], &resultTime); err != nil { + return nil, errors.Wrapf(err, "unmarshaling scalar time from %v", resultPointSlice) + } + if err := json.Unmarshal(resultPointSlice[1], &resultValue); err != nil { + return nil, errors.Wrapf(err, "unmarshaling scalar value from %v", resultPointSlice) + } + return model.Vector{&model.Sample{ + Metric: model.Metric{}, + Value: resultValue, + Timestamp: resultTime}}, nil +} diff --git a/pkg/promclient/promclient_e2e_test.go b/pkg/promclient/promclient_e2e_test.go new file mode 100644 index 0000000000..9262687a85 --- /dev/null +++ b/pkg/promclient/promclient_e2e_test.go @@ -0,0 +1,83 @@ +package promclient + +import ( + "context" + "fmt" + "net/url" + "path" + "testing" + "time" + + "github.com/prometheus/common/model" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/testutil" +) + +func TestIsWALFileAccesible_e2e(t *testing.T) { + p, err := testutil.NewPrometheus() + testutil.Ok(t, err) + + testutil.Ok(t, p.Start()) + defer func() { testutil.Ok(t, p.Stop()) }() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { return IsWALDirAccesible(p.Dir()) })) + + testutil.NotOk(t, IsWALDirAccesible(path.Join(p.Dir(), "/non-existing"))) + testutil.NotOk(t, IsWALDirAccesible(path.Join(p.Dir(), "/../"))) +} + +func TestExternalLabels_e2e(t *testing.T) { + p, err := testutil.NewPrometheus() + testutil.Ok(t, err) + + err = p.SetConfig(` +global: + external_labels: + region: eu-west + az: 1 +`) + testutil.Ok(t, err) + + testutil.Ok(t, p.Start()) + defer func() { testutil.Ok(t, p.Stop()) }() + + u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) + testutil.Ok(t, err) + + ext, err := ExternalLabels(context.Background(), log.NewNopLogger(), u) + testutil.Ok(t, err) + + testutil.Equals(t, 2, len(ext)) + testutil.Equals(t, "eu-west", ext.Get("region")) + testutil.Equals(t, "1", ext.Get("az")) +} + +func TestRule_UnmarshalScalarResponse(t *testing.T) { + var ( + scalarJSONResult = []byte(`[1541196373.677,"1"]`) + invalidLengthScalarJSONResult = []byte(`[1541196373.677,"1", "nonsence"]`) + invalidDataScalarJSONResult = []byte(`["foo","bar"]`) + + vectorResult model.Vector + expectedVector = model.Vector{&model.Sample{ + Metric: model.Metric{}, + Value: 1, + Timestamp: model.Time(1541196373677)}} + ) + // Test valid input. + vectorResult, err := convertScalarJSONToVector(scalarJSONResult) + testutil.Ok(t, err) + testutil.Equals(t, vectorResult.String(), expectedVector.String()) + + // Test invalid length of scalar data structure. + vectorResult, err = convertScalarJSONToVector(invalidLengthScalarJSONResult) + testutil.NotOk(t, err) + + // Test invalid format of scalar data. + vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult) + testutil.NotOk(t, err) +} diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index 807b61f8a0..c0e9cb8d24 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -124,6 +124,7 @@ func (p *Prometheus) Start() error { "--storage.tsdb.path="+p.db.Dir(), "--web.listen-address="+p.addr, "--web.route-prefix="+p.prefix, + "--web.enable-admin-api", "--config.file="+filepath.Join(p.db.Dir(), "prometheus.yml"), ) go func() { @@ -137,12 +138,17 @@ func (p *Prometheus) Start() error { return nil } -// Addr gets correct address after Start method. +// Dir returns TSDB dir. +func (p *Prometheus) Dir() string { + return p.dir +} + +// Addr returns correct address after Start method. func (p *Prometheus) Addr() string { return p.addr + p.prefix } -// SetConfig updates the contents of the config file. +// SetConfig updates the contents of the config file. By default it is empty. func (p *Prometheus) SetConfig(s string) (err error) { f, err := os.Create(filepath.Join(p.dir, "prometheus.yml")) if err != nil { @@ -157,7 +163,7 @@ func (p *Prometheus) SetConfig(s string) (err error) { // Stop terminates Prometheus and clean up its data directory. func (p *Prometheus) Stop() error { if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil { - return errors.Wrapf(err, "failed to Prometheus. Kill it manually and cleanr %s dir", p.db.Dir()) + return errors.Wrapf(err, "failed to Prometheus. Kill it manually and clean %s dir", p.db.Dir()) } time.Sleep(time.Second / 2) @@ -187,6 +193,30 @@ func CreateBlock( mint, maxt int64, extLset labels.Labels, resolution int64, +) (id ulid.ULID, err error) { + return createBlock(dir, series, numSamples, mint, maxt, extLset, resolution, false) +} + +// CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block. +func CreateBlockWithTombstone( + dir string, + series []labels.Labels, + numSamples int, + mint, maxt int64, + extLset labels.Labels, + resolution int64, +) (id ulid.ULID, err error) { + return createBlock(dir, series, numSamples, mint, maxt, extLset, resolution, true) +} + +func createBlock( + dir string, + series []labels.Labels, + numSamples int, + mint, maxt int64, + extLset labels.Labels, + resolution int64, + tombstones bool, ) (id ulid.ULID, err error) { h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000) if err != nil { @@ -251,8 +281,10 @@ func CreateBlock( return id, errors.Wrap(err, "finalize block") } - if err = os.Remove(filepath.Join(dir, id.String(), "tombstones")); err != nil { - return id, errors.Wrap(err, "remove tombstones") + if !tombstones { + if err = os.Remove(filepath.Join(dir, id.String(), "tombstones")); err != nil { + return id, errors.Wrap(err, "remove tombstones") + } } return id, nil