diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bf3378b5a..b47b0b2673 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. -- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons. +- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-key-series-ratio` flag to mark posting group as lazy if its number of keys exceeds ratio * max series the query should fetch where ratio is the flag value. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons. +- [#7957](https://github.com/thanos-io/thanos/pull/7957) Compact, Store Gateway: Compactor will set series p90, p99, p999 and p9999 size in `meta.json` file. Lazy expanded postings will choose estimated series size based on `estimated-series-size-stat` flag. ### Changed diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 556369b0a1..db6870ebfb 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -411,6 +411,10 @@ func processDownsampling( } if stats.SeriesMaxSize > 0 { meta.Thanos.IndexStats.SeriesMaxSize = stats.SeriesMaxSize + meta.Thanos.IndexStats.SeriesP90Size = stats.SeriesP90Size + meta.Thanos.IndexStats.SeriesP99Size = stats.SeriesP99Size + meta.Thanos.IndexStats.SeriesP999Size = stats.SeriesP999Size + meta.Thanos.IndexStats.SeriesP9999Size = stats.SeriesP9999Size } if err := meta.WriteToDir(logger, resdir); err != nil { return errors.Wrap(err, "write meta") diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 1cdc12679c..c3e5acb4f7 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -76,6 +76,7 @@ type storeConfig struct { indexCacheSizeBytes units.Base2Bytes chunkPoolSize units.Base2Bytes estimatedMaxSeriesSize uint64 + estimatedSeriesSizeStat string estimatedMaxChunkSize uint64 seriesBatchSize int storeRateLimits store.SeriesSelectLimits @@ -165,6 +166,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("debug.estimated-max-series-size", "Estimated max series size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 64KB."). Hidden().Default(strconv.Itoa(store.EstimatedMaxSeriesSize)).Uint64Var(&sc.estimatedMaxSeriesSize) + cmd.Flag("estimated-series-size-stat", "Statistic to use to estimate block series size. This is currently used for lazy expanded posting series size estimation. Available options are max, p90, p99, p999 and p9999. Default value is "+string(store.BlockSeriesSizeMax)). + Default(string(store.BlockSeriesSizeMax)). + EnumVar(&sc.estimatedSeriesSizeStat, string(store.BlockSeriesSizeMax), string(store.BlockSeriesSizeP99), string(store.BlockSeriesSizeP999), string(store.BlockSeriesSizeP9999)) + cmd.Flag("debug.estimated-max-chunk-size", "Estimated max chunk size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB."). Hidden().Default(strconv.Itoa(store.EstimatedMaxChunkSize)).Uint64Var(&sc.estimatedMaxChunkSize) @@ -402,6 +407,8 @@ func runStore( return errors.Wrap(err, "create chunk pool") } + estimatedSeriesSizeStat := strings.ToLower(conf.estimatedSeriesSizeStat) + options := []store.BucketStoreOption{ store.WithLogger(logger), store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger { @@ -425,6 +432,37 @@ func runStore( } return conf.estimatedMaxSeriesSize }), + store.WithBlockEstimatedSeriesSizeFunc(func(m metadata.Meta) uint64 { + switch estimatedSeriesSizeStat { + case string(store.BlockSeriesSizeMax): + if m.Thanos.IndexStats.SeriesMaxSize > 0 { + return uint64(m.Thanos.IndexStats.SeriesMaxSize) + } + case string(store.BlockSeriesSizeP90): + if m.Thanos.IndexStats.SeriesP90Size > 0 { + return uint64(m.Thanos.IndexStats.SeriesP90Size) + } + case string(store.BlockSeriesSizeP99): + if m.Thanos.IndexStats.SeriesP99Size > 0 { + return uint64(m.Thanos.IndexStats.SeriesP99Size) + } + case string(store.BlockSeriesSizeP999): + if m.Thanos.IndexStats.SeriesP999Size > 0 { + return uint64(m.Thanos.IndexStats.SeriesP999Size) + } + case string(store.BlockSeriesSizeP9999): + if m.Thanos.IndexStats.SeriesP9999Size > 0 { + return uint64(m.Thanos.IndexStats.SeriesP9999Size) + } + } + + // Always fallback to series max size if none of other stats available. + if m.Thanos.IndexStats.SeriesMaxSize > 0 { + return uint64(m.Thanos.IndexStats.SeriesMaxSize) + } + // If series max size not available from the metadata, fallback to the configured default. + return conf.estimatedMaxSeriesSize + }), store.WithBlockEstimatedMaxChunkFunc(func(m metadata.Meta) uint64 { if m.Thanos.IndexStats.ChunkMaxSize > 0 && uint64(m.Thanos.IndexStats.ChunkMaxSize) < conf.estimatedMaxChunkSize { diff --git a/docs/components/store.md b/docs/components/store.md index ce2adb6d6d..e2c93a3738 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -94,6 +94,12 @@ Flags: --no-cache-index-header option is specified. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. + --estimated-series-size-stat=max + Statistic to use to estimate block series size. + This is currently used for lazy expanded + posting series size estimation. Available + options are max, p90, p99, p999 and p9999. + Default value is max --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/go.mod b/go.mod index 9a725cb167..6208f3d21c 100644 --- a/go.mod +++ b/go.mod @@ -113,6 +113,7 @@ require ( require ( capnproto.org/go/capnp/v3 v3.0.0-alpha.30 + github.com/DataDog/sketches-go v1.4.6 github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/hashicorp/golang-lru/v2 v2.0.7 diff --git a/go.sum b/go.sum index 9933abda40..8e8ed20893 100644 --- a/go.sum +++ b/go.sum @@ -1349,6 +1349,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U+0KMqAA0KcU= github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/DataDog/sketches-go v1.4.6 h1:acd5fb+QdUzGrosfNLwrIhqyrbMORpvBy7mE+vHlT3I= +github.com/DataDog/sketches-go v1.4.6/go.mod h1:7Y8GN8Jf66DLyDhc94zuWA3uHEt/7ttt8jHOBWWrSOg= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3 h1:i84ZOPT35YCJROyuf97VP/VEdYhQce/8NTLOWq5tqJw= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3/go.mod h1:3+qm+VCJbVmQ9uscVz+8h1rRkJEy9ZNFGgpT1XB9mPg= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 h1:FhsH8qgWFkkPlPXBZ68uuT/FH/R+DLTtVPxjLEBs1v4= diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 1ebe6b1c6a..7c283247fc 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -15,8 +15,7 @@ import ( "testing" "time" - "github.com/thanos-io/thanos/pkg/extprom" - + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -24,10 +23,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/objstore" - "github.com/efficientgo/core/testutil" + "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -144,7 +143,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 3, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 702, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) // File stats are gathered. testutil.Equals(t, fmt.Sprintf(`{ @@ -185,7 +184,11 @@ func TestUpload(t *testing.T) { } ], "index_stats": { - "series_max_size": 16 + "series_max_size": 16, + "series_p90_size": 16, + "series_p99_size": 16, + "series_p999_size": 16, + "series_p9999_size": 16 } } } @@ -197,7 +200,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 3, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 702, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) } { // Upload with no external labels should be blocked. @@ -229,7 +232,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 6, len(bkt.Objects())) testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)])) - testutil.Equals(t, 574, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) + testutil.Equals(t, 681, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) } } diff --git a/pkg/block/index.go b/pkg/block/index.go index 7d4c876154..091c93c53a 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -14,6 +14,7 @@ import ( "strings" "time" + "github.com/DataDog/sketches-go/ddsketch" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" @@ -85,9 +86,13 @@ type HealthStats struct { ChunkAvgSize int64 ChunkMaxSize int64 - SeriesMinSize int64 - SeriesAvgSize int64 - SeriesMaxSize int64 + SeriesMinSize int64 + SeriesAvgSize int64 + SeriesMaxSize int64 + SeriesP9999Size int64 + SeriesP999Size int64 + SeriesP99Size int64 + SeriesP90Size int64 SingleSampleSeries int64 SingleSampleChunks int64 @@ -209,6 +214,60 @@ func (n *minMaxSumInt64) Avg() int64 { return n.sum / n.cnt } +// sketch is a wrapper for DDSketch which allows to calculate quantile values with a relative accuracy. +type sketch struct { + cnt int64 + s *ddsketch.DDSketch +} + +func newSketch() *sketch { + // Always valid if accuracy is within (0, 1). + // Hardcode 0.1 relative accuracy as we only need int precision. + dd, _ := ddsketch.NewDefaultDDSketch(0.1) + return &sketch{s: dd} +} + +func (s *sketch) Add(v int64) { + s.cnt++ + // Impossible to happen since v should > 0. + _ = s.s.Add(float64(v)) +} + +func (s *sketch) Avg() int64 { + if s.cnt == 0 { + return 0 + } + // Impossible to happen if sketch is not empty. + return int64(s.s.GetSum()) / s.cnt +} + +func (s *sketch) Max() int64 { + if s.cnt == 0 { + return 0 + } + // Impossible to happen if sketch is not empty. + v, _ := s.s.GetMaxValue() + return int64(v) +} + +func (s *sketch) Min() int64 { + if s.cnt == 0 { + return 0 + } + // Impossible to happen if sketch is not empty. + v, _ := s.s.GetMinValue() + return int64(v) +} + +func (s *sketch) Quantile(quantile float64) int64 { + if s.cnt == 0 { + return 0 + } + // Impossible to happen if quantile is valid and sketch is not empty. + v, _ := s.s.GetValueAtQuantile(quantile) + return int64(v) +} + // GatherIndexHealthStats returns useful counters as well as outsider chunks (chunks outside of block time range) that // helps to assess index health. // It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle. @@ -237,7 +296,7 @@ func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, m seriesChunks = newMinMaxSumInt64() chunkDuration = newMinMaxSumInt64() chunkSize = newMinMaxSumInt64() - seriesSize = newMinMaxSumInt64() + seriesSize = newSketch() ) lnames, err := r.LabelNames(ctx) @@ -391,9 +450,13 @@ func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, m stats.ChunkAvgSize = chunkSize.Avg() stats.ChunkMinSize = chunkSize.min - stats.SeriesMaxSize = seriesSize.max + stats.SeriesMaxSize = seriesSize.Max() stats.SeriesAvgSize = seriesSize.Avg() - stats.SeriesMinSize = seriesSize.min + stats.SeriesMinSize = seriesSize.Min() + stats.SeriesP90Size = seriesSize.Quantile(0.90) + stats.SeriesP99Size = seriesSize.Quantile(0.99) + stats.SeriesP999Size = seriesSize.Quantile(0.999) + stats.SeriesP9999Size = seriesSize.Quantile(0.9999) stats.ChunkMaxDuration = time.Duration(chunkDuration.max) * time.Millisecond stats.ChunkAvgDuration = time.Duration(chunkDuration.Avg()) * time.Millisecond diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go index 02097d4c4b..32ed5e2b65 100644 --- a/pkg/block/index_test.go +++ b/pkg/block/index_test.go @@ -10,13 +10,13 @@ import ( "path/filepath" "testing" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" - - "github.com/efficientgo/core/testutil" + "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" @@ -96,3 +96,22 @@ func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) { testutil.Equals(t, 1, stats.OutOfOrderChunks) testutil.NotOk(t, stats.OutOfOrderChunksErr()) } + +func TestSketch(t *testing.T) { + s := newSketch() + // Empty. + require.Equal(t, int64(0), s.cnt) + require.Equal(t, int64(0), s.Max()) + require.Equal(t, int64(0), s.Min()) + require.Equal(t, int64(0), s.Avg()) + require.Equal(t, int64(0), s.Quantile(0.9)) + + s.Add(1) + s.Add(2) + s.Add(3) + require.Equal(t, int64(3), s.cnt) + require.Equal(t, int64(3), s.Max()) + require.Equal(t, int64(1), s.Min()) + require.Equal(t, int64(2), s.Avg()) + require.Equal(t, int64(2), s.Quantile(0.9)) +} diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 11567fb06e..1135958d09 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -100,8 +100,12 @@ type Thanos struct { } type IndexStats struct { - SeriesMaxSize int64 `json:"series_max_size,omitempty"` - ChunkMaxSize int64 `json:"chunk_max_size,omitempty"` + SeriesMaxSize int64 `json:"series_max_size,omitempty"` + SeriesP90Size int64 `json:"series_p90_size,omitempty"` + SeriesP99Size int64 `json:"series_p99_size,omitempty"` + SeriesP999Size int64 `json:"series_p999_size,omitempty"` + SeriesP9999Size int64 `json:"series_p9999_size,omitempty"` + ChunkMaxSize int64 `json:"chunk_max_size,omitempty"` } func (m *Thanos) ParseExtensions(v any) (any, error) { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index a20544b2f4..a8169b2980 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1303,6 +1303,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } if stats.SeriesMaxSize > 0 { thanosMeta.IndexStats.SeriesMaxSize = stats.SeriesMaxSize + thanosMeta.IndexStats.SeriesP90Size = stats.SeriesP90Size + thanosMeta.IndexStats.SeriesP99Size = stats.SeriesP99Size + thanosMeta.IndexStats.SeriesP999Size = stats.SeriesP999Size + thanosMeta.IndexStats.SeriesP9999Size = stats.SeriesP9999Size } newMeta, err = metadata.InjectThanos(cg.logger, bdir, thanosMeta, nil) if err != nil { diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index c0232547ca..57c331543c 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -401,6 +401,10 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set") // Only one chunk will be generated in that block, so we won't set chunk size. testutil.Assert(t, meta.Thanos.IndexStats.SeriesMaxSize > 0, "compacted blocks have index stats series max size set") + testutil.Assert(t, meta.Thanos.IndexStats.SeriesP90Size > 0, "compacted blocks have index stats series P90 size set") + testutil.Assert(t, meta.Thanos.IndexStats.SeriesP99Size > 0, "compacted blocks have index stats series P99 size set") + testutil.Assert(t, meta.Thanos.IndexStats.SeriesP999Size > 0, "compacted blocks have index stats series P999 size set") + testutil.Assert(t, meta.Thanos.IndexStats.SeriesP9999Size > 0, "compacted blocks have index stats series P9999 size set") } }) } diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index 48fc00adfb..5a079856d3 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -970,7 +970,14 @@ func TestBucketStore_Acceptance(t *testing.T) { Labels: labels.NewBuilder(extLset).Set("replica", replica).Labels().Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, - IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, + IndexStats: metadata.IndexStats{ + SeriesMaxSize: stats.SeriesMaxSize, + SeriesP90Size: stats.SeriesP90Size, + SeriesP99Size: stats.SeriesP99Size, + SeriesP999Size: stats.SeriesP999Size, + SeriesP9999Size: stats.SeriesP9999Size, + ChunkMaxSize: stats.ChunkMaxSize, + }, }, nil) testutil.Ok(tt, err) @@ -1115,7 +1122,14 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) { Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, - IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, + IndexStats: metadata.IndexStats{ + SeriesMaxSize: stats.SeriesMaxSize, + SeriesP90Size: stats.SeriesP90Size, + SeriesP99Size: stats.SeriesP99Size, + SeriesP999Size: stats.SeriesP999Size, + SeriesP9999Size: stats.SeriesP9999Size, + ChunkMaxSize: stats.ChunkMaxSize, + }, }, nil) testutil.Ok(tt, err) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1c434f503f..4f3b8635ac 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -73,6 +73,16 @@ const ( ChunksTouched ) +type BlockSeriesSizeStat string + +const ( + BlockSeriesSizeMax BlockSeriesSizeStat = "max" + BlockSeriesSizeP90 BlockSeriesSizeStat = "p90" + BlockSeriesSizeP99 BlockSeriesSizeStat = "p99" + BlockSeriesSizeP999 BlockSeriesSizeStat = "p999" + BlockSeriesSizeP9999 BlockSeriesSizeStat = "p9999" +) + const ( // MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed // for precalculating the number of samples that we may have to retrieve and decode for any given query @@ -430,8 +440,14 @@ type BucketStore struct { sortingStrategy sortingStrategy + // blockEstimatedMaxSeriesFunc is a function which estimates max series size of a block for series fetch purpose. + // We want to use max series size as metric to avoid series re-fetch. blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator + // blockEstimatedSeriesSizeFunc is a function which estimates series size of a block based on configured strategy. + // It can be either max, P90, P99, P999, etc series size of the block. It can be used for the purpose of lazy posting + // series size estimation when there is no strict requirement to use max series size of the block. + blockEstimatedSeriesSizeFunc BlockEstimator indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc @@ -546,6 +562,12 @@ func WithBlockEstimatedMaxSeriesFunc(f BlockEstimator) BucketStoreOption { } } +func WithBlockEstimatedSeriesSizeFunc(f BlockEstimator) BucketStoreOption { + return func(s *BucketStore) { + s.blockEstimatedSeriesSizeFunc = f + } +} + func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { return func(s *BucketStore) { s.blockEstimatedMaxChunkFunc = f @@ -833,6 +855,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er indexHeaderReader, s.partitioner, s.blockEstimatedMaxSeriesFunc, + s.blockEstimatedSeriesSizeFunc, s.blockEstimatedMaxChunkFunc, ) if err != nil { @@ -2386,6 +2409,10 @@ type bucketBlock struct { estimatedMaxChunkSize int estimatedMaxSeriesSize int + + // estimatedSeriesSize is an estimated series size used in lazy postings. It can be a different metric to + // estimatedMaxSeriesSize as when fetching series we need to use series max size to avoid re-fetch. + estimatedSeriesSize int } func newBucketBlock( @@ -2399,12 +2426,17 @@ func newBucketBlock( indexHeadReader indexheader.Reader, p Partitioner, maxSeriesSizeFunc BlockEstimator, + seriesSizeFunc BlockEstimator, maxChunkSizeFunc BlockEstimator, ) (b *bucketBlock, err error) { maxSeriesSize := EstimatedMaxSeriesSize if maxSeriesSizeFunc != nil { maxSeriesSize = int(maxSeriesSizeFunc(*meta)) } + seriesSize := maxSeriesSize + if seriesSizeFunc != nil { + seriesSize = int(seriesSizeFunc(*meta)) + } maxChunkSize := EstimatedMaxChunkSize if maxChunkSizeFunc != nil { maxChunkSize = int(maxChunkSizeFunc(*meta)) @@ -2425,6 +2457,7 @@ func newBucketBlock( extLset: extLset, relabelLabels: relabelLabels, estimatedMaxSeriesSize: maxSeriesSize, + estimatedSeriesSize: seriesSize, estimatedMaxChunkSize: maxChunkSize, } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index df4d1e189c..a7c4c602c4 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -226,7 +226,7 @@ func TestBucketFilterExtLabelsMatchers(t *testing.T) { }, }, } - b, _ := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) + b, _ := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil, nil) ms := []*labels.Matcher{ {Type: labels.MatchNotEqual, Name: "a", Value: "b"}, } @@ -286,7 +286,7 @@ func TestBucketBlock_matchLabels(t *testing.T) { }, } - b, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) + b, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil, nil) testutil.Ok(t, err) cases := []struct { @@ -1195,7 +1195,14 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in Labels: labels.FromStrings("ext1", "1").Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, - IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, + IndexStats: metadata.IndexStats{ + SeriesMaxSize: stats.SeriesMaxSize, + SeriesP90Size: stats.SeriesP90Size, + SeriesP99Size: stats.SeriesP99Size, + SeriesP999Size: stats.SeriesP999Size, + SeriesP9999Size: stats.SeriesP9999Size, + ChunkMaxSize: stats.ChunkMaxSize, + }, }, nil) testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, logger, bkt, bdir, metadata.NoneFunc)) @@ -1505,8 +1512,12 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, IndexStats: metadata.IndexStats{ - SeriesMaxSize: stats.SeriesMaxSize, - ChunkMaxSize: stats.ChunkMaxSize, + SeriesMaxSize: stats.SeriesMaxSize, + SeriesP90Size: stats.SeriesP90Size, + SeriesP99Size: stats.SeriesP99Size, + SeriesP999Size: stats.SeriesP999Size, + SeriesP9999Size: stats.SeriesP9999Size, + ChunkMaxSize: stats.ChunkMaxSize, }, } @@ -2771,7 +2782,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. - blk, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil, nil, nil) + blk, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil, nil, nil, nil) testutil.Ok(b, err) b.ResetTimer() @@ -2860,7 +2871,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. - blk, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner, nil, nil) + blk, err := newBucketBlock(context.Background(), newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner, nil, nil, nil) testutil.Ok(b, err) return blk, blockMeta } @@ -3550,6 +3561,7 @@ func TestExpandedPostingsRace(t *testing.T) { NewGapBasedPartitioner(PartitionerMaxGapSize), nil, nil, + nil, ) testutil.Ok(t, err) diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 81b977f5d3..21cb7b1a30 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -42,7 +42,7 @@ func (p *lazyExpandedPostings) lazyExpanded() bool { func optimizePostingsFetchByDownloadedBytes( r *bucketIndexReader, postingGroups []*postingGroup, - seriesMaxSize int64, + seriesSize int64, seriesMatchRatio float64, postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, @@ -175,7 +175,7 @@ func optimizePostingsFetchByDownloadedBytes( underfetchedSeries = int64(math.Ceil(float64(pg.cardinality) * seriesMatchRatio)) } seriesMatched -= underfetchedSeries - underfetchedSeriesSize = underfetchedSeries * seriesMaxSize + underfetchedSeriesSize = underfetchedSeries * seriesSize } else { // Only mark posting group as lazy due to too many keys when those keys are known to be existent. if postingGroupMaxKeySeriesRatio > 0 && maxSeriesMatched > 0 && @@ -184,7 +184,7 @@ func optimizePostingsFetchByDownloadedBytes( i++ continue } - underfetchedSeriesSize = seriesMaxSize * int64(math.Ceil(float64(seriesMatched)*(1-seriesMatchRatio))) + underfetchedSeriesSize = seriesSize * int64(math.Ceil(float64(seriesMatched)*(1-seriesMatchRatio))) seriesMatched = int64(math.Ceil(float64(seriesMatched) * seriesMatchRatio)) } @@ -227,16 +227,16 @@ func fetchLazyExpandedPostings( There are several cases that we skip postings fetch optimization: - Lazy expanded posting disabled. - Add all postings. This means we don't have a posting group with any add keys. - - Block estimated max series size not set which means we don't have a way to estimate series bytes downloaded. - - `SeriesMaxSize` not set for this block then we have no way to estimate series size. + - Block estimated series size not set which means we don't have a way to estimate series bytes downloaded. + - `SeriesSize` not set for this block then we have no way to estimate series size. - Only one effective posting group available. We need to at least download postings from 1 posting group so no need to optimize. */ if lazyExpandedPostingEnabled && !addAllPostings && - r.block.estimatedMaxSeriesSize > 0 && len(postingGroups) > 1 { + r.block.estimatedSeriesSize > 0 && len(postingGroups) > 1 { postingGroups, emptyPostingGroup, err = optimizePostingsFetchByDownloadedBytes( r, postingGroups, - int64(r.block.estimatedMaxSeriesSize), + int64(r.block.estimatedSeriesSize), 0.5, // TODO(yeya24): Expose this as a flag. postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 3eac705871..7bb514025f 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -312,7 +312,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { inputPostings map[string]map[string]index.Range inputError error postingGroups []*postingGroup - seriesMaxSize int64 + seriesSize int64 seriesMatchRatio float64 postingGroupMaxKeySeriesRatio float64 expectedPostingGroups []*postingGroup @@ -338,7 +338,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "bar": {"foo": index.Range{Start: 8, End: 16}}, }, inputError: inputError, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -350,7 +350,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { { name: "posting offsets empty with add keys, expect empty posting", inputPostings: map[string]map[string]index.Range{}, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -364,7 +364,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { inputPostings: map[string]map[string]index.Range{ "foo": {"bar": index.Range{End: 8}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -378,7 +378,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { inputPostings: map[string]map[string]index.Range{ "foo": {"bar": index.Range{End: 8}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -395,7 +395,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"baz": index.Range{Start: 8, End: 16}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -410,7 +410,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"baz": index.Range{Start: 8, End: 16}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -427,7 +427,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 16}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -444,7 +444,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 16}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -461,7 +461,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 16}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -479,7 +479,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 108}}, "bar": {"foo": index.Range{Start: 108, End: 116}, "bar": index.Range{Start: 116, End: 124}, "baz": index.Range{Start: 124, End: 132}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -497,7 +497,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 16}, "bar": index.Range{Start: 16, End: 24}, "baz": index.Range{Start: 24, End: 32}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -515,7 +515,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 16}, "bar": index.Range{Start: 16, End: 24}, "baz": index.Range{Start: 24, End: 32}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -533,7 +533,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {addAll: true, name: "foo", removeKeys: []string{"bar"}}, @@ -552,7 +552,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 16}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {addAll: true, name: "foo", removeKeys: []string{"bar"}}, @@ -569,7 +569,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 1000012}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {addAll: true, name: "foo", removeKeys: []string{"bar"}}, @@ -586,7 +586,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 16}}, }, - seriesMaxSize: 1, + seriesSize: 1, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -603,7 +603,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "foo": {"bar": index.Range{End: 8}}, "bar": {"foo": index.Range{Start: 8, End: 1000012}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -621,7 +621,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "bar": {"foo": index.Range{Start: 8, End: 1000012}}, "cluster": {"us": index.Range{Start: 1000012, End: 1000020}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {name: "foo", addKeys: []string{"bar"}}, @@ -641,7 +641,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "bar": {"bar": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}, "foo": index.Range{Start: 24, End: 32}}, "cluster": {"us": index.Range{Start: 32, End: 108}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroupMaxKeySeriesRatio: 2, postingGroups: []*postingGroup{ @@ -662,7 +662,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "bar": {"bar": index.Range{Start: 8, End: 16}, "baz": index.Range{Start: 16, End: 24}, "foo": index.Range{Start: 24, End: 32}}, "cluster": {"us": index.Range{Start: 32, End: 108}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroupMaxKeySeriesRatio: 2, postingGroups: []*postingGroup{ @@ -683,7 +683,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "bar": {"foo": index.Range{Start: 8, End: 1000012}}, "cluster": {"us": index.Range{Start: 1000012, End: 1000020}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {addAll: true, name: "foo", removeKeys: []string{"bar"}}, @@ -704,7 +704,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { "baz": {"foo": index.Range{Start: 2012, End: 4020}}, "cluster": {"us": index.Range{Start: 4020, End: 1004024}}, }, - seriesMaxSize: 1000, + seriesSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ {addAll: true, name: "foo", removeKeys: []string{"bar"}}, @@ -723,12 +723,12 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { t.Run(tc.name, func(t *testing.T) { headerReader := &mockIndexHeaderReader{postings: tc.inputPostings, err: tc.inputError} registry := prometheus.NewRegistry() - block, err := newBucketBlock(ctx, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil) + block, err := newBucketBlock(ctx, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil, nil) testutil.Ok(t, err) ir := newBucketIndexReader(block, logger) dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"}) dummyCounterVec := promauto.With(registry).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) - pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, tc.postingGroupMaxKeySeriesRatio, dummyCounter, dummyCounterVec) + pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesSize, tc.seriesMatchRatio, tc.postingGroupMaxKeySeriesRatio, dummyCounter, dummyCounterVec) if err != nil { testutil.Equals(t, tc.expectedError, err.Error()) return diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 5d784a9cc0..53cc895f22 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -651,7 +651,14 @@ func createBlock( Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource, Files: files, - IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize}, + // For simplicity, use series size for all series size fields. + IndexStats: metadata.IndexStats{ + SeriesMaxSize: seriesSize, + SeriesP90Size: seriesSize, + SeriesP99Size: seriesSize, + SeriesP999Size: seriesSize, + SeriesP9999Size: seriesSize, + }, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") }