Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive,ruler: Upgraded TSDB and used ChunkIterator in Series TSDB Store. #2876

Merged
merged 1 commit into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2902](https://github.com/thanos-io/thanos/pull/2902) ui: React: Separate dedupe and partial response checkboxes per panel.
- [#2931](https://github.com/thanos-io/thanos/pull/2931) Query: Allow passing a `storeMatcher[]` to select matching stores when debugging the querier. See [documentation](https://thanos.io/components/query.md/#store-filtering)
- [#2991](https://github.com/thanos-io/thanos/pull/2991) store: `operation` label value `getrange` changed to `get_range` for `thanos_store_bucket_cache_operation_requests_total` and `thanos_store_bucket_cache_operation_hits_total` to be consistent with bucket operation metrics.
- [#2876](https://github.com/thanos-io/thanos/pull/2876) Receive,Ruler: Updated TSDB and switched to ChunkIterators instead of sample one, which avoids unnecessary decoding / encoding.

## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10

Expand Down
1 change: 0 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
}

flagsMap := getFlagsMap(cmd.Model().Flags)

return runQuery(
g,
logger,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ require (
// so that we don't get errors about being incompatible with the Go proxies.
// See https://github.com/thanos-io/thanos/issues/1415
replace (
// Make sure Cortex is not forcing us to some other Prometheus version.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159
google.golang.org/grpc => google.golang.org/grpc v1.29.1
k8s.io/klog => k8s.io/klog v0.3.1
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
github.com/prometheus/common v0.8.0/go.mod h1:PC/OgXc+UN7B4ALwvn1yzVZmVwvhXp5JsbBv6wSv6i0=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.11.1 h1:0ZISXCMRuCZcxF77aT1BXY5m74mX2vrGYl1dSwBI0Jo=
github.com/prometheus/common v0.11.1/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
Expand All @@ -876,8 +875,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2 h1:eQ88y1vfbXuclr6B04jYTmhc6ydXlBUSIaXCjEs0osk=
github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2/go.mod h1:i1KZsZmyDTJRvnR7zE8z/u2v+tkpPjoiPpnWp6nwhr0=
github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159 h1:mT66e//l/+QugUat5A42YvIPxlsS6O/yr9UtjlDhPYw=
github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159/go.mod h1:zfAqy/MwhMFajB9E2n12/9gG2fvofIE9uKDtlZCDxqs=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
Expand Down
2 changes: 1 addition & 1 deletion pkg/rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func (m *Manager) Stop() {
mgr.Stop()
}
}

func (m *Manager) protoRuleGroups() []*rulespb.RuleGroup {

rg := m.RuleGroups()
res := make([]*rulespb.RuleGroup, 0, len(rg))
for _, g := range rg {
Expand Down
7 changes: 3 additions & 4 deletions pkg/rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (n nopQueryable) Querier(_ context.Context, _, _ int64) (storage.Querier, e
}

// Regression test against https://github.com/thanos-io/thanos/issues/1779.
func TestRun(t *testing.T) {
func TestRun_Subqueries(t *testing.T) {
dir, err := ioutil.TempDir("", "test_rule_run")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
Expand Down Expand Up @@ -84,17 +84,16 @@ groups:
},
labels.FromStrings("replica", "1"),
)
testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")}))
testutil.Ok(t, thanosRuleMgr.Update(1*time.Second, []string{filepath.Join(dir, "rule.yaml")}))

thanosRuleMgr.Run()
defer thanosRuleMgr.Stop()

select {
case <-time.After(2 * time.Minute):
case <-time.After(1 * time.Minute):
t.Fatal("timeout while waiting on rule manager query evaluation")
case <-queryDone:
}

testutil.Equals(t, "rate(some_metric[1h:5m] offset 1d)", query)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/storepb/testutil/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
testutil.Ok(t, err)
}

h, err := tsdb.NewHead(nil, nil, w, 10000000, tsdbDir, nil, tsdb.DefaultStripeSize, nil)
h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, tsdbDir, nil, tsdb.DefaultStripeSize, nil)
testutil.Ok(t, err)

app := h.Appender(context.Background())
Expand Down
124 changes: 60 additions & 64 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -24,18 +23,19 @@ import (
)

type TSDBReader interface {
storage.Queryable
storage.ChunkQueryable
StartTime() (int64, error)
}

// TSDBStore implements the store API against a local TSDB instance.
// It attaches the provided external labels to all results. It only responds with raw data
// and does not support downsampling.
type TSDBStore struct {
logger log.Logger
db TSDBReader
component component.StoreAPI
externalLabels labels.Labels
logger log.Logger
db TSDBReader
component component.StoreAPI
externalLabels labels.Labels
maxBytesPerFrame int
}

// ReadWriteTSDBStore is a TSDBStore that can also be written to.
Expand All @@ -50,10 +50,11 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, com
logger = log.NewNopLogger()
}
return &TSDBStore{
logger: logger,
db: db,
component: component,
externalLabels: externalLabels,
logger: logger,
db: db,
component: component,
externalLabels: externalLabels,
maxBytesPerFrame: 1024 * 1024, // 1MB as recommended by gRPC.
}
}

Expand Down Expand Up @@ -109,7 +110,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
return status.Error(codes.InvalidArgument, err.Error())
}

q, err := s.db.Querier(context.Background(), r.MinTime, r.MaxTime)
q, err := s.db.ChunkQuerier(context.Background(), r.MinTime, r.MaxTime)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
Expand All @@ -119,72 +120,67 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
set = q.Select(false, nil, matchers...)
respSeries storepb.Series
)

// Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for set.Next() {
series := set.At()

respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels)

if !r.SkipChunks {
// TODO(fabxc): An improvement over this trivial approach would be to directly
// use the chunks provided by TSDB in the response.
c, err := s.encodeChunks(series.Iterator(), MaxSamplesPerChunk)
if err != nil {
return status.Errorf(codes.Internal, "encode chunk: %s", err)
respSeries.Chunks = respSeries.Chunks[:0]
if r.SkipChunks {
if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
return status.Error(codes.Aborted, err.Error())
}

respSeries.Chunks = append(respSeries.Chunks[:0], c...)
continue
}

if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
return status.Error(codes.Aborted, err.Error())
frameBytesLeft := s.maxBytesPerFrame
for _, lbl := range respSeries.Labels {
frameBytesLeft -= lbl.Size()
}
}
if err := set.Err(); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}

func (s *TSDBStore) encodeChunks(it chunkenc.Iterator, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) {
var (
chkMint int64
chk *chunkenc.XORChunk
app chunkenc.Appender
isNext = it.Next()
)

for isNext {
if chk == nil {
chk = chunkenc.NewXORChunk()
app, err = chk.Appender()
if err != nil {
return nil, err
chIter := series.Iterator()
isNext := chIter.Next()
for isNext {
chk := chIter.At()
if chk.Chunk == nil {
return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
}
chkMint, _ = it.At()
}

app.Append(it.At())
chkMaxt, _ := it.At()
respSeries.Chunks = append(respSeries.Chunks, storepb.AggrChunk{
MinTime: chk.MinTime,
MaxTime: chk.MaxTime,
Raw: &storepb.Chunk{
Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one.
Data: chk.Chunk.Bytes(),
},
})
frameBytesLeft -= respSeries.Chunks[len(respSeries.Chunks)-1].Size()

// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = chIter.Next()
if frameBytesLeft > 0 && isNext {
continue
}

isNext = it.Next()
if isNext && chk.NumSamples() < maxSamplesPerChunk {
continue
if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
return status.Error(codes.Aborted, err.Error())
}
respSeries.Chunks = respSeries.Chunks[:0]
}
if err := chIter.Err(); err != nil {
return status.Error(codes.Internal, errors.Wrap(err, "chunk iter").Error())
}

// Cut the chunk.
chks = append(chks, storepb.AggrChunk{
MinTime: chkMint,
MaxTime: chkMaxt,
Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()},
})
chk = nil
}
if it.Err() != nil {
return nil, errors.Wrap(it.Err(), "read TSDB series")
if err := set.Err(); err != nil {
return status.Error(codes.Internal, err.Error())
}
for _, w := range set.Warnings() {
if err := srv.Send(storepb.NewWarnSeriesResponse(w)); err != nil {
return status.Error(codes.Aborted, err.Error())
}
}

return chks, nil

return nil
}

// translateAndExtendLabels transforms a metrics into a protobuf label set. It additionally
Expand Down Expand Up @@ -217,7 +213,7 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb.
func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
q, err := s.db.Querier(ctx, r.Start, r.End)
q, err := s.db.ChunkQuerier(ctx, r.Start, r.End)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand All @@ -234,7 +230,7 @@ func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest
func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
*storepb.LabelValuesResponse, error,
) {
q, err := s.db.Querier(ctx, r.Start, r.End)
q, err := s.db.ChunkQuerier(ctx, r.Start, r.End)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
1 change: 0 additions & 1 deletion pkg/store/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ func TestTSDBStore_LabelValues(t *testing.T) {
}

tsdbStore := NewTSDBStore(nil, nil, db, component.Rule, labels.FromStrings("region", "eu-west"))

now := time.Now()
head := db.Head()
for _, tc := range []struct {
Expand Down