Skip to content

Commit

Permalink
fix reader getting wrong posting offsets when querying multiple values (
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 authored and hczhu-db committed Aug 22, 2024
1 parent 72ea302 commit 2736cf9
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7271](https://github.com/thanos-io/thanos/pull/7271) Query: fixing dedup iterator when working on mixed sample types.
- [#7289](https://github.com/thanos-io/thanos/pull/7289) Query Frontend: show warnings from downstream queries.
- [#7308](https://github.com/thanos-io/thanos/pull/7308) Store: Batch TSDB Infos for blocks.
- [#7301](https://github.com/thanos-io/thanos/pull/7301) Store Gateway: fix index header reader `PostingsOffsets` returning wrong values.

### Added

Expand Down
12 changes: 11 additions & 1 deletion pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra

// Iterate on the offset table.
newSameRngs = newSameRngs[:0]
Iter:
for d.Err() == nil {
// Posting format entry is as follows:
// │ ┌────────────────────────────────────────┐ │
Expand Down Expand Up @@ -916,6 +917,15 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
break
}
wantedValue = values[valueIndex]
// Only do this if there is no new range added. If there is an existing
// range we want to continue iterating the offset table to get the end.
if len(newSameRngs) == 0 && i+1 < len(e.offsets) {
// We want to limit this loop within e.offsets[i, i+1). So when the wanted value
// is >= e.offsets[i+1], go out of the loop and binary search again.
if wantedValue >= e.offsets[i+1].value {
break Iter
}
}
}

if i+1 == len(e.offsets) {
Expand All @@ -942,7 +952,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra

if len(newSameRngs) > 0 {
// We added some ranges in this iteration. Use next posting offset as the end of our ranges.
// We know it exists as we never go further in this loop than e.offsets[i, i+1].
// We know it exists as we never go further in this loop than e.offsets[i, i+1).

skipNAndName(&d, &buf)
d.UvarintBytes() // Label value.
Expand Down
122 changes: 114 additions & 8 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"context"
"fmt"
"math"
"math/rand"
"path/filepath"
"sort"
"strconv"
"testing"
"time"

"github.com/go-kit/log"
"github.com/oklog/ulid"
Expand All @@ -18,6 +21,7 @@ import (
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

Expand Down Expand Up @@ -53,6 +57,9 @@ func TestReaders(t *testing.T) {
{{Name: "a", Value: "13"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "2"}},
{{Name: "cluster", Value: "a-eu-west-1"}},
{{Name: "cluster", Value: "b-eu-west-1"}},
{{Name: "cluster", Value: "c-eu-west-1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)

Expand Down Expand Up @@ -108,15 +115,15 @@ func TestReaders(t *testing.T) {
if id == id1 {
testutil.Equals(t, 1, br.version)
testutil.Equals(t, 2, br.indexVersion)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 70}, br.toc)
testutil.Equals(t, int64(710), br.indexLastPostingEnd)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 114}, br.toc)
testutil.Equals(t, int64(905), br.indexLastPostingEnd)
testutil.Equals(t, 8, br.symbols.Size())
testutil.Equals(t, 0, len(br.postingsV1))
testutil.Equals(t, 2, len(br.nameSymbols))
testutil.Equals(t, 3, len(br.nameSymbols))
testutil.Equals(t, map[string]*postingValueOffsets{
"": {
offsets: []postingOffset{{value: "", tableOff: 4}},
lastValOffset: 440,
lastValOffset: 576,
},
"a": {
offsets: []postingOffset{
Expand All @@ -126,14 +133,21 @@ func TestReaders(t *testing.T) {
{value: "7", tableOff: 75},
{value: "9", tableOff: 89},
},
lastValOffset: 640,
lastValOffset: 776,
},
"cluster": {
offsets: []postingOffset{
{value: "a-eu-west-1", tableOff: 96},
{value: "c-eu-west-1", tableOff: 142},
},
lastValOffset: 824,
},
"longer-string": {
offsets: []postingOffset{
{value: "1", tableOff: 96},
{value: "2", tableOff: 115},
{value: "1", tableOff: 165},
{value: "2", tableOff: 184},
},
lastValOffset: 706,
lastValOffset: 901,
},
}, br.postings)

Expand Down Expand Up @@ -173,6 +187,17 @@ func TestReaders(t *testing.T) {
testutil.Assert(t, rngs[2].End > rngs[2].Start)
testutil.Equals(t, NotFoundRange, rngs[1])

// 3 values exist and 3 values don't exist.
rngs, err = br.PostingsOffsets("cluster", "a-eu-west-1", "a-us-west-2", "b-eu-west-1", "b-us-east-1", "c-eu-west-1", "c-us-east-2")
testutil.Ok(t, err)
for i := 0; i < len(rngs); i++ {
if i%2 == 0 {
testutil.Assert(t, rngs[i].End > rngs[i].Start)
} else {
testutil.Equals(t, NotFoundRange, rngs[i])
}
}

// Regression tests for https://github.com/thanos-io/thanos/issues/2213.
// Most of not existing value was working despite bug, except in certain unlucky cases
// it was causing "invalid size" errors.
Expand Down Expand Up @@ -521,3 +546,84 @@ func readSymbols(bs index.ByteSlice, version, off int) ([]string, map[uint32]str
}
return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols")
}

// The idea of this test case is to make sure that reader.PostingsOffsets and
// reader.PostingsOffset get the same index ranges for required label values.
func TestReaderPostingsOffsets(t *testing.T) {
ctx := context.Background()

tmpDir := t.TempDir()

possibleClusters := []string{"us-west-2", "us-east-1", "us-east-2", "eu-west-1", "eu-central-1", "ap-southeast-1", "ap-south-1"}
possiblePrefixes := []string{"a", "b", "c", "d", "1", "2", "3", "4"}
totalValues := []string{}
for i := 0; i < len(possibleClusters); i++ {
for j := 0; j < len(possiblePrefixes); j++ {
totalValues = append(totalValues, fmt.Sprintf("%s-%s", possiblePrefixes[j], possibleClusters[i]))
}
}

rnd := rand.New(rand.NewSource(time.Now().Unix()))
// Pick 5 label values to be used in the block.
clusterLbls := make([]labels.Labels, 0)
valueSet := map[int]struct{}{}
for i := 0; i < 5; {
idx := rnd.Intn(len(totalValues))
if _, ok := valueSet[idx]; ok {
continue
}
valueSet[idx] = struct{}{}
clusterLbls = append(clusterLbls, []labels.Label{
{Name: "cluster", Value: totalValues[idx]},
})
i++
}

// Add additional labels.
lbls := append([]labels.Labels{
{{Name: "job", Value: "1"}},
{{Name: "job", Value: "2"}},
{{Name: "job", Value: "3"}},
{{Name: "job", Value: "4"}},
{{Name: "job", Value: "5"}},
{{Name: "job", Value: "6"}},
{{Name: "job", Value: "7"}},
{{Name: "job", Value: "8"}},
{{Name: "job", Value: "9"}}}, clusterLbls...)
bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()
id, err := e2eutil.CreateBlock(ctx, tmpDir, lbls, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)

testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, id.String()), metadata.NoneFunc))

fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename)
_, err = WriteBinary(ctx, bkt, id, fn)
testutil.Ok(t, err)

br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewBinaryReaderMetrics(nil))
testutil.Ok(t, err)

defer func() { testutil.Ok(t, br.Close()) }()

for i := 0; i < 100; i++ {
vals := make([]string, 0, 15)
for j := 0; j < 15; j++ {
vals = append(vals, totalValues[rnd.Intn(len(totalValues))])
}
sort.Strings(vals)
rngs, err := br.PostingsOffsets("cluster", vals...)
require.NoError(t, err)
rngs2 := make([]index.Range, 0)
for _, val := range vals {
rng2, err2 := br.PostingsOffset("cluster", val)
if err2 == NotFoundRangeErr {
rngs2 = append(rngs2, NotFoundRange)
} else {
rngs2 = append(rngs2, rng2)
}
}
require.Equal(t, rngs2, rngs, "Got mismatched results from batched and non-batched API.\nInput cluster labels: %v.\nValues queried: %v", clusterLbls, vals)
}
}
11 changes: 8 additions & 3 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"strings"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -54,14 +55,18 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups
return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name)
}

for _, r := range rngs {
if r == indexheader.NotFoundRange {
for _, rng := range rngs {
if rng == indexheader.NotFoundRange {
continue
}
if rng.End <= rng.Start {
level.Error(r.block.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization")
return postingGroups, false, nil
}
// Each range starts from the #entries field which is 4 bytes.
// Need to subtract it when calculating number of postings.
// https://github.com/prometheus/prometheus/blob/v2.46.0/tsdb/docs/format/index.md.
pg.cardinality += (r.End - r.Start - 4) / 4
pg.cardinality += (rng.End - rng.Start - 4) / 4
}
// If the posting group adds keys, 0 cardinality means the posting doesn't exist.
// If the posting group removes keys, no posting ranges found is fine as it is a noop.
Expand Down

0 comments on commit 2736cf9

Please sign in to comment.