Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose more series size stats and use in lazy posting #7957

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options
- [#7353](https://github.com/thanos-io/thanos/pull/7353) [#8045](https://github.com/thanos-io/thanos/pull/8045) Receiver/StoreGateway: Add `--matcher-cache-size` option to enable caching for regex matchers in series calls.
- [#8017](https://github.com/thanos-io/thanos/pull/8017) Store Gateway: Use native histogram for binary reader load and download duration and fixed download duration metric. #8017
- [#7957](https://github.com/thanos-io/thanos/pull/7957) Compact, Store Gateway: Compactor will set series p90, p99, p999 and p9999 size in `meta.json` file. Lazy expanded postings will choose estimated series size based on `estimated-series-size-stat` flag.

### Changed

Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ func processDownsampling(
}
if stats.SeriesMaxSize > 0 {
meta.Thanos.IndexStats.SeriesMaxSize = stats.SeriesMaxSize
meta.Thanos.IndexStats.SeriesAvgSize = stats.SeriesAvgSize
meta.Thanos.IndexStats.SeriesP90Size = stats.SeriesP90Size
meta.Thanos.IndexStats.SeriesP99Size = stats.SeriesP99Size
meta.Thanos.IndexStats.SeriesP999Size = stats.SeriesP999Size
meta.Thanos.IndexStats.SeriesP9999Size = stats.SeriesP9999Size
meta.Thanos.IndexStats.SeriesSizeStdDev = stats.SeriesSizeStdDev
}
if err := meta.WriteToDir(logger, resdir); err != nil {
return errors.Wrap(err, "write meta")
Expand Down
49 changes: 49 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type storeConfig struct {
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedSeriesSizeStat string
estimatedSeriesSizeZScore float64
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
Expand Down Expand Up @@ -167,6 +169,13 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("debug.estimated-max-series-size", "Estimated max series size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 64KB.").
Hidden().Default(strconv.Itoa(store.EstimatedMaxSeriesSize)).Uint64Var(&sc.estimatedMaxSeriesSize)

cmd.Flag("estimated-series-size-stat", "Statistic to use to estimate block series size. This is currently used for lazy expanded posting series size estimation. Available options are max, p90, p99, p99, p9999 and zscore. If zscore is picked, the actual zscore value is set via estimated-series-size-zscore.").
Default(string(store.BlockSeriesSizeZScore)).
EnumVar(&sc.estimatedSeriesSizeStat, string(store.BlockSeriesSizeMax), string(store.BlockSeriesSizeP90), string(store.BlockSeriesSizeP99), string(store.BlockSeriesSizeP999), string(store.BlockSeriesSizeP9999), string(store.BlockSeriesSizeZScore))

cmd.Flag("estimated-series-size-zscore", "Zscore is a statistical measurement that describes a value's relationship to the mean series size. Zscore 2 is calculated as average size + 2 * standard deviation. Use a larger zscore if you want a larger estimated series size. Default value is 2. Cannot be lower than 0.").
Default("2").Float64Var(&sc.estimatedSeriesSizeZScore)

cmd.Flag("debug.estimated-max-chunk-size", "Estimated max chunk size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.").
Hidden().Default(strconv.Itoa(store.EstimatedMaxChunkSize)).Uint64Var(&sc.estimatedMaxChunkSize)

Expand Down Expand Up @@ -414,6 +423,11 @@ func runStore(
return errors.Wrap(err, "create chunk pool")
}

estimatedSeriesSizeStat := strings.ToLower(conf.estimatedSeriesSizeStat)
if estimatedSeriesSizeStat == string(store.BlockSeriesSizeZScore) && conf.estimatedSeriesSizeZScore < 0 {
return errors.Errorf("estimated series size zscore cannot be lower than 0 (got %v)", conf.estimatedSeriesSizeZScore)
}

options := []store.BucketStoreOption{
store.WithLogger(logger),
store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger {
Expand All @@ -438,6 +452,41 @@ func runStore(
}
return conf.estimatedMaxSeriesSize
}),
store.WithBlockEstimatedSeriesSizeFunc(func(m metadata.Meta) uint64 {
switch estimatedSeriesSizeStat {
case string(store.BlockSeriesSizeMax):
if m.Thanos.IndexStats.SeriesMaxSize > 0 {
return uint64(m.Thanos.IndexStats.SeriesMaxSize)
}
case string(store.BlockSeriesSizeP90):
if m.Thanos.IndexStats.SeriesP90Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP90Size)
}
case string(store.BlockSeriesSizeP99):
if m.Thanos.IndexStats.SeriesP99Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP99Size)
}
case string(store.BlockSeriesSizeP999):
if m.Thanos.IndexStats.SeriesP999Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP999Size)
}
case string(store.BlockSeriesSizeP9999):
if m.Thanos.IndexStats.SeriesP9999Size > 0 {
return uint64(m.Thanos.IndexStats.SeriesP9999Size)
}
case string(store.BlockSeriesSizeZScore):
if m.Thanos.IndexStats.SeriesSizeStdDev > 0 && m.Thanos.IndexStats.SeriesAvgSize > 0 {
return uint64(float64(m.Thanos.IndexStats.SeriesSizeStdDev)*conf.estimatedSeriesSizeZScore) + uint64(m.Thanos.IndexStats.SeriesAvgSize)
}
}

// Always fallback to series max size if none of other stats available.
if m.Thanos.IndexStats.SeriesMaxSize > 0 {
return uint64(m.Thanos.IndexStats.SeriesMaxSize)
}
// If series max size not available from the metadata, fallback to the configured default.
return conf.estimatedMaxSeriesSize
}),
store.WithBlockEstimatedMaxChunkFunc(func(m metadata.Meta) uint64 {
if m.Thanos.IndexStats.ChunkMaxSize > 0 &&
uint64(m.Thanos.IndexStats.ChunkMaxSize) < conf.estimatedMaxChunkSize {
Expand Down
15 changes: 15 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ Flags:
--no-cache-index-header option is specified.
--enable-auto-gomemlimit Enable go runtime to automatically limit memory
consumption.
--estimated-series-size-stat=zscore
Statistic to use to estimate block series size.
This is currently used for lazy expanded
posting series size estimation. Available
options are max, p90, p99, p99, p9999 and
zscore. If zscore is picked, the actual zscore
value is set via estimated-series-size-zscore.
--estimated-series-size-zscore=2
Zscore is a statistical measurement that
describes a value's relationship to the mean
series size. Zscore 2 is calculated as average
size + 2 * standard deviation. Use a larger
zscore if you want a larger estimated series
size. Default value is 2. Cannot be lower than
0.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23.0
require (
capnproto.org/go/capnp/v3 v3.0.0-alpha.30
cloud.google.com/go/trace v1.10.12
github.com/DataDog/sketches-go v1.4.6
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3
github.com/KimMachineGun/automemlimit v0.6.1
github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U+0KMqAA0KcU=
github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/sketches-go v1.4.6 h1:acd5fb+QdUzGrosfNLwrIhqyrbMORpvBy7mE+vHlT3I=
github.com/DataDog/sketches-go v1.4.6/go.mod h1:7Y8GN8Jf66DLyDhc94zuWA3uHEt/7ttt8jHOBWWrSOg=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3 h1:i84ZOPT35YCJROyuf97VP/VEdYhQce/8NTLOWq5tqJw=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.8.3/go.mod h1:3+qm+VCJbVmQ9uscVz+8h1rRkJEy9ZNFGgpT1XB9mPg=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 h1:FhsH8qgWFkkPlPXBZ68uuT/FH/R+DLTtVPxjLEBs1v4=
Expand Down
21 changes: 13 additions & 8 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@ import (
"testing"
"time"

"github.com/thanos-io/thanos/pkg/extprom"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -144,7 +143,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 756, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))

// File stats are gathered.
testutil.Equals(t, fmt.Sprintf(`{
Expand Down Expand Up @@ -185,7 +184,13 @@ func TestUpload(t *testing.T) {
}
],
"index_stats": {
"series_max_size": 16
"series_max_size": 16,
"series_avg_size": 16,
"series_p90_size": 16,
"series_p99_size": 16,
"series_p999_size": 16,
"series_p9999_size": 16,
"series_size_stddev": 1
}
}
}
Expand All @@ -197,7 +202,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 756, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Upload with no external labels should be blocked.
Expand Down Expand Up @@ -229,7 +234,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 6, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 574, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
testutil.Equals(t, 735, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
}
}

Expand Down
92 changes: 86 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/DataDog/sketches-go/ddsketch"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -85,9 +86,14 @@ type HealthStats struct {
ChunkAvgSize int64
ChunkMaxSize int64

SeriesMinSize int64
SeriesAvgSize int64
SeriesMaxSize int64
SeriesMinSize int64
SeriesAvgSize int64
SeriesMaxSize int64
SeriesP9999Size int64
SeriesP999Size int64
SeriesP99Size int64
SeriesP90Size int64
SeriesSizeStdDev int64

SingleSampleSeries int64
SingleSampleChunks int64
Expand Down Expand Up @@ -209,6 +215,75 @@ func (n *minMaxSumInt64) Avg() int64 {
return n.sum / n.cnt
}

// sketch is a wrapper for DDSketch which allows to calculate quantile values with a relative accuracy.
type sketch struct {
cnt int64
sum int64
sum2 int64
s *ddsketch.DDSketch
}

func newSketch() *sketch {
// Always valid if accuracy is within (0, 1).
// Hardcode 0.1 relative accuracy as we only need int precision.
dd, _ := ddsketch.NewDefaultDDSketch(0.1)
return &sketch{s: dd}
}

func (s *sketch) Add(v int64) {
s.cnt++
s.sum += v
s.sum2 += v * v
// Impossible to happen since v should > 0.
_ = s.s.Add(float64(v))
}

func (s *sketch) Avg() int64 {
if s.cnt == 0 {
return 0
}
return s.sum / s.cnt
}

func (s *sketch) Max() int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if sketch is not empty.
v, _ := s.s.GetMaxValue()
return int64(v)
}

func (s *sketch) Min() int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if sketch is not empty.
v, _ := s.s.GetMinValue()
return int64(v)
}

func (s *sketch) StdDev() float64 {
if s.cnt == 0 {
return 0
}
mean := s.sum / s.cnt
return math.Sqrt(float64(s.sum2/s.cnt - mean*mean))
}

func (s *sketch) ZScore(z float64) int64 {
return int64(s.StdDev()*z) + s.Avg()
}

func (s *sketch) Quantile(quantile float64) int64 {
if s.cnt == 0 {
return 0
}
// Impossible to happen if quantile is valid and sketch is not empty.
v, _ := s.s.GetValueAtQuantile(quantile)
return int64(v)
}

// GatherIndexHealthStats returns useful counters as well as outsider chunks (chunks outside of block time range) that
// helps to assess index health.
// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle.
Expand Down Expand Up @@ -237,7 +312,7 @@ func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, m
seriesChunks = newMinMaxSumInt64()
chunkDuration = newMinMaxSumInt64()
chunkSize = newMinMaxSumInt64()
seriesSize = newMinMaxSumInt64()
seriesSize = newSketch()
)

lnames, err := r.LabelNames(ctx)
Expand Down Expand Up @@ -391,9 +466,14 @@ func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, m
stats.ChunkAvgSize = chunkSize.Avg()
stats.ChunkMinSize = chunkSize.min

stats.SeriesMaxSize = seriesSize.max
stats.SeriesMaxSize = seriesSize.Max()
stats.SeriesAvgSize = seriesSize.Avg()
stats.SeriesMinSize = seriesSize.min
stats.SeriesMinSize = seriesSize.Min()
stats.SeriesP90Size = seriesSize.Quantile(0.90)
stats.SeriesP99Size = seriesSize.Quantile(0.99)
stats.SeriesP999Size = seriesSize.Quantile(0.999)
stats.SeriesP9999Size = seriesSize.Quantile(0.9999)
stats.SeriesSizeStdDev = int64(seriesSize.StdDev())

stats.ChunkMaxDuration = time.Duration(chunkDuration.max) * time.Millisecond
stats.ChunkAvgDuration = time.Duration(chunkDuration.Avg()) * time.Millisecond
Expand Down
23 changes: 21 additions & 2 deletions pkg/block/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"path/filepath"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/efficientgo/core/testutil"
"github.com/stretchr/testify/require"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
Expand Down Expand Up @@ -96,3 +96,22 @@ func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
testutil.Equals(t, 1, stats.OutOfOrderChunks)
testutil.NotOk(t, stats.OutOfOrderChunksErr())
}

func TestSketch(t *testing.T) {
s := newSketch()
// Empty.
require.Equal(t, int64(0), s.cnt)
require.Equal(t, int64(0), s.Max())
require.Equal(t, int64(0), s.Min())
require.Equal(t, int64(0), s.Avg())
require.Equal(t, int64(0), s.Quantile(0.9))

s.Add(1)
s.Add(2)
s.Add(3)
require.Equal(t, int64(3), s.cnt)
require.Equal(t, int64(3), s.Max())
require.Equal(t, int64(1), s.Min())
require.Equal(t, int64(2), s.Avg())
require.Equal(t, int64(2), s.Quantile(0.9))
}
10 changes: 8 additions & 2 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,14 @@ type Thanos struct {
}

type IndexStats struct {
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
SeriesAvgSize int64 `json:"series_avg_size,omitempty"`
SeriesP90Size int64 `json:"series_p90_size,omitempty"`
SeriesP99Size int64 `json:"series_p99_size,omitempty"`
SeriesP999Size int64 `json:"series_p999_size,omitempty"`
SeriesP9999Size int64 `json:"series_p9999_size,omitempty"`
SeriesSizeStdDev int64 `json:"series_size_stddev,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
}

func (m *Thanos) ParseExtensions(v any) (any, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,12 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
}
if stats.SeriesMaxSize > 0 {
thanosMeta.IndexStats.SeriesMaxSize = stats.SeriesMaxSize
thanosMeta.IndexStats.SeriesAvgSize = stats.SeriesAvgSize
thanosMeta.IndexStats.SeriesP90Size = stats.SeriesP90Size
thanosMeta.IndexStats.SeriesP99Size = stats.SeriesP99Size
thanosMeta.IndexStats.SeriesP999Size = stats.SeriesP999Size
thanosMeta.IndexStats.SeriesP9999Size = stats.SeriesP9999Size
thanosMeta.IndexStats.SeriesSizeStdDev = stats.SeriesSizeStdDev
}
newMeta, err = metadata.InjectThanos(cg.logger, bdir, thanosMeta, nil)
if err != nil {
Expand Down
Loading
Loading