Skip to content

Commit

Permalink
expose more series size stats
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 10, 2024
1 parent 51c7dcd commit 95b8c17
Show file tree
Hide file tree
Showing 18 changed files with 276 additions and 61 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers.
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-key-series-ratio` flag to mark posting group as lazy if its number of keys exceeds ratio * max series the query should fetch where ratio is the flag value. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.
- [#7957](https://github.com/thanos-io/thanos/pull/7957) Compact, Store Gateway: Compactor will set series p90, p99, p999 and p9999 size in `meta.json` file. Lazy expanded postings will choose estimated series size based on `estimated-series-size-stat` flag.

### Changed

Expand Down
4 changes: 4 additions & 0 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ func processDownsampling(
}
if stats.SeriesMaxSize > 0 {
meta.Thanos.IndexStats.SeriesMaxSize = stats.SeriesMaxSize
meta.Thanos.IndexStats.SeriesP90Size = stats.SeriesP90Size
meta.Thanos.IndexStats.SeriesP99Size = stats.SeriesP99Size
meta.Thanos.IndexStats.SeriesP999Size = stats.SeriesP999Size
meta.Thanos.IndexStats.SeriesP9999Size = stats.SeriesP9999Size
}
if err := meta.WriteToDir(logger, resdir); err != nil {
return errors.Wrap(err, "write meta")
Expand Down
38 changes: 38 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type storeConfig struct {
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedSeriesSizeStat string
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
Expand Down Expand Up @@ -165,6 +166,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("debug.estimated-max-series-size", "Estimated max series size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 64KB.").
Hidden().Default(strconv.Itoa(store.EstimatedMaxSeriesSize)).Uint64Var(&sc.estimatedMaxSeriesSize)

cmd.Flag("estimated-series-size-stat", "Statistic to use to estimate block series size. This is currently used for lazy expanded posting series size estimation. Available options are max, p90, p99, p999 and p9999. Default value is "+string(store.BlockSeriesSizeMax)).
Default(string(store.BlockSeriesSizeMax)).
EnumVar(&sc.estimatedSeriesSizeStat, string(store.BlockSeriesSizeMax), string(store.BlockSeriesSizeP99), string(store.BlockSeriesSizeP999), string(store.BlockSeriesSizeP9999))

cmd.Flag("debug.estimated-max-chunk-size", "Estimated max chunk size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.").
Hidden().Default(strconv.Itoa(store.EstimatedMaxChunkSize)).Uint64Var(&sc.estimatedMaxChunkSize)

Expand Down Expand Up @@ -402,6 +407,8 @@ func runStore(
return errors.Wrap(err, "create chunk pool")
}

estimatedSeriesSizeStat := strings.ToLower(conf.estimatedSeriesSizeStat)

options := []store.BucketStoreOption{
store.WithLogger(logger),
store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger {
Expand All @@ -425,6 +432,37 @@ func runStore(
}
return conf.estimatedMaxSeriesSize
}),
store.WithBlockEstimatedSeriesSizeFunc(func(m metadata.Meta) uint64 {
switch estimatedSeriesSizeStat {
case string(store.BlockSeriesSizeMax):
if m.Thanos.IndexStats.SeriesMaxSize > 0 {
return uint64(m.Thanos.IndexStats.SeriesMaxSize)
}
case string(store.BlockSeriesSizeP90):
if m.Thanos.IndexStats.SeriesP90Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP90Size)
}
case string(store.BlockSeriesSizeP99):
if m.Thanos.IndexStats.SeriesP99Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP99Size)
}
case string(store.BlockSeriesSizeP999):
if m.Thanos.IndexStats.SeriesP999Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP999Size)
}
case string(store.BlockSeriesSizeP9999):
if m.Thanos.IndexStats.SeriesP9999Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP9999Size)
}
}

// Always fallback to series max size if none of other stats available.
if m.Thanos.IndexStats.SeriesMaxSize > 0 {
return uint64(m.Thanos.IndexStats.SeriesMaxSize)
}
// If series max size not available from the metadata, fallback to the configured default.
return conf.estimatedMaxSeriesSize
}),
store.WithBlockEstimatedMaxChunkFunc(func(m metadata.Meta) uint64 {
if m.Thanos.IndexStats.ChunkMaxSize > 0 &&
uint64(m.Thanos.IndexStats.ChunkMaxSize) < conf.estimatedMaxChunkSize {
Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ Flags:
--no-cache-index-header option is specified.
--enable-auto-gomemlimit Enable go runtime to automatically limit memory
consumption.
--estimated-series-size-stat=max
Statistic to use to estimate block series size.
This is currently used for lazy expanded
posting series size estimation. Available
options are max, p90, p99, p999 and p9999.
Default value is max
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ require (

require (
capnproto.org/go/capnp/v3 v3.0.0-alpha.30
github.com/DataDog/sketches-go v1.4.6
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/hashicorp/golang-lru/v2 v2.0.7
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U+0KMqAA0KcU=
github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/sketches-go v1.4.6 h1:acd5fb+QdUzGrosfNLwrIhqyrbMORpvBy7mE+vHlT3I=
github.com/DataDog/sketches-go v1.4.6/go.mod h1:7Y8GN8Jf66DLyDhc94zuWA3uHEt/7ttt8jHOBWWrSOg=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3 h1:i84ZOPT35YCJROyuf97VP/VEdYhQce/8NTLOWq5tqJw=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3/go.mod h1:3+qm+VCJbVmQ9uscVz+8h1rRkJEy9ZNFGgpT1XB9mPg=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 h1:FhsH8qgWFkkPlPXBZ68uuT/FH/R+DLTtVPxjLEBs1v4=
Expand Down
19 changes: 11 additions & 8 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@ import (
"testing"
"time"

"github.com/thanos-io/thanos/pkg/extprom"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -144,7 +143,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 702, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))

// File stats are gathered.
testutil.Equals(t, fmt.Sprintf(`{
Expand Down Expand Up @@ -185,7 +184,11 @@ func TestUpload(t *testing.T) {
}
],
"index_stats": {
"series_max_size": 16
"series_max_size": 16,
"series_p90_size": 16,
"series_p99_size": 16,
"series_p999_size": 16,
"series_p9999_size": 16
}
}
}
Expand All @@ -197,7 +200,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 702, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Upload with no external labels should be blocked.
Expand Down Expand Up @@ -229,7 +232,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 6, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 574, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
testutil.Equals(t, 681, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
}
}

Expand Down
75 changes: 69 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/DataDog/sketches-go/ddsketch"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -85,9 +86,13 @@ type HealthStats struct {
ChunkAvgSize int64
ChunkMaxSize int64

SeriesMinSize int64
SeriesAvgSize int64
SeriesMaxSize int64
SeriesMinSize int64
SeriesAvgSize int64
SeriesMaxSize int64
SeriesP9999Size int64
SeriesP999Size int64
SeriesP99Size int64
SeriesP90Size int64

SingleSampleSeries int64
SingleSampleChunks int64
Expand Down Expand Up @@ -209,6 +214,60 @@ func (n *minMaxSumInt64) Avg() int64 {
return n.sum / n.cnt
}

// sketch is a wrapper for DDSketch which allows to calculate quantile values with a relative accuracy.
type sketch struct {
cnt int64
s *ddsketch.DDSketch
}

func newSketch() *sketch {
// Always valid if accuracy is within (0, 1).
// Hardcode 0.1 relative accuracy as we only need int precision.
dd, _ := ddsketch.NewDefaultDDSketch(0.1)
return &sketch{s: dd}
}

func (s *sketch) Add(v int64) {
s.cnt++
// Impossible to happen since v should > 0.
_ = s.s.Add(float64(v))
}

func (s *sketch) Avg() int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if sketch is not empty.
return int64(s.s.GetSum()) / s.cnt
}

func (s *sketch) Max() int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if sketch is not empty.
v, _ := s.s.GetMaxValue()
return int64(v)
}

func (s *sketch) Min() int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if sketch is not empty.
v, _ := s.s.GetMinValue()
return int64(v)
}

func (s *sketch) Quantile(quantile float64) int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if quantile is valid and sketch is not empty.
v, _ := s.s.GetValueAtQuantile(quantile)
return int64(v)
}

// GatherIndexHealthStats returns useful counters as well as outsider chunks (chunks outside of block time range) that
// helps to assess index health.
// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle.
Expand Down Expand Up @@ -237,7 +296,7 @@ func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, m
seriesChunks = newMinMaxSumInt64()
chunkDuration = newMinMaxSumInt64()
chunkSize = newMinMaxSumInt64()
seriesSize = newMinMaxSumInt64()
seriesSize = newSketch()
)

lnames, err := r.LabelNames(ctx)
Expand Down Expand Up @@ -391,9 +450,13 @@ func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, m
stats.ChunkAvgSize = chunkSize.Avg()
stats.ChunkMinSize = chunkSize.min

stats.SeriesMaxSize = seriesSize.max
stats.SeriesMaxSize = seriesSize.Max()
stats.SeriesAvgSize = seriesSize.Avg()
stats.SeriesMinSize = seriesSize.min
stats.SeriesMinSize = seriesSize.Min()
stats.SeriesP90Size = seriesSize.Quantile(0.90)
stats.SeriesP99Size = seriesSize.Quantile(0.99)
stats.SeriesP999Size = seriesSize.Quantile(0.999)
stats.SeriesP9999Size = seriesSize.Quantile(0.9999)

stats.ChunkMaxDuration = time.Duration(chunkDuration.max) * time.Millisecond
stats.ChunkAvgDuration = time.Duration(chunkDuration.Avg()) * time.Millisecond
Expand Down
23 changes: 21 additions & 2 deletions pkg/block/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"path/filepath"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/efficientgo/core/testutil"
"github.com/stretchr/testify/require"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
Expand Down Expand Up @@ -96,3 +96,22 @@ func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
testutil.Equals(t, 1, stats.OutOfOrderChunks)
testutil.NotOk(t, stats.OutOfOrderChunksErr())
}

func TestSketch(t *testing.T) {
s := newSketch()
// Empty.
require.Equal(t, int64(0), s.cnt)
require.Equal(t, int64(0), s.Max())
require.Equal(t, int64(0), s.Min())
require.Equal(t, int64(0), s.Avg())
require.Equal(t, int64(0), s.Quantile(0.9))

s.Add(1)
s.Add(2)
s.Add(3)
require.Equal(t, int64(3), s.cnt)
require.Equal(t, int64(3), s.Max())
require.Equal(t, int64(1), s.Min())
require.Equal(t, int64(2), s.Avg())
require.Equal(t, int64(2), s.Quantile(0.9))
}
8 changes: 6 additions & 2 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@ type Thanos struct {
}

type IndexStats struct {
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
SeriesP90Size int64 `json:"series_p90_size,omitempty"`
SeriesP99Size int64 `json:"series_p99_size,omitempty"`
SeriesP999Size int64 `json:"series_p999_size,omitempty"`
SeriesP9999Size int64 `json:"series_p9999_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
}

func (m *Thanos) ParseExtensions(v any) (any, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
}
if stats.SeriesMaxSize > 0 {
thanosMeta.IndexStats.SeriesMaxSize = stats.SeriesMaxSize
thanosMeta.IndexStats.SeriesP90Size = stats.SeriesP90Size
thanosMeta.IndexStats.SeriesP99Size = stats.SeriesP99Size
thanosMeta.IndexStats.SeriesP999Size = stats.SeriesP999Size
thanosMeta.IndexStats.SeriesP9999Size = stats.SeriesP9999Size
}
newMeta, err = metadata.InjectThanos(cg.logger, bdir, thanosMeta, nil)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set")
// Only one chunk will be generated in that block, so we won't set chunk size.
testutil.Assert(t, meta.Thanos.IndexStats.SeriesMaxSize > 0, "compacted blocks have index stats series max size set")
testutil.Assert(t, meta.Thanos.IndexStats.SeriesP90Size > 0, "compacted blocks have index stats series P90 size set")
testutil.Assert(t, meta.Thanos.IndexStats.SeriesP99Size > 0, "compacted blocks have index stats series P99 size set")
testutil.Assert(t, meta.Thanos.IndexStats.SeriesP999Size > 0, "compacted blocks have index stats series P999 size set")
testutil.Assert(t, meta.Thanos.IndexStats.SeriesP9999Size > 0, "compacted blocks have index stats series P9999 size set")
}
})
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,14 @@ func TestBucketStore_Acceptance(t *testing.T) {
Labels: labels.NewBuilder(extLset).Set("replica", replica).Labels().Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
IndexStats: metadata.IndexStats{
SeriesMaxSize: stats.SeriesMaxSize,
SeriesP90Size: stats.SeriesP90Size,
SeriesP99Size: stats.SeriesP99Size,
SeriesP999Size: stats.SeriesP999Size,
SeriesP9999Size: stats.SeriesP9999Size,
ChunkMaxSize: stats.ChunkMaxSize,
},
}, nil)
testutil.Ok(tt, err)

Expand Down Expand Up @@ -1115,7 +1122,14 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
IndexStats: metadata.IndexStats{
SeriesMaxSize: stats.SeriesMaxSize,
SeriesP90Size: stats.SeriesP90Size,
SeriesP99Size: stats.SeriesP99Size,
SeriesP999Size: stats.SeriesP999Size,
SeriesP9999Size: stats.SeriesP9999Size,
ChunkMaxSize: stats.ChunkMaxSize,
},
}, nil)
testutil.Ok(tt, err)

Expand Down
Loading

0 comments on commit 95b8c17

Please sign in to comment.