Skip to content

Commit

Permalink
Fix CI
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Oct 31, 2022
1 parent 11c03fa commit 184f695
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 31 deletions.
5 changes: 5 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type storeConfig struct {
httpConfig httpConfig
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
seriesBatchSize int
maxSampleCount uint64
maxTouchedSeriesCount uint64
maxDownloadedBytes units.Base2Bytes
Expand Down Expand Up @@ -129,6 +130,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&sc.blockMetaFetchConcurrency)

cmd.Flag("debug.series-batch-size", "The batch size when fetching series from object storage.").
Hidden().Default("10000").IntVar(&sc.seriesBatchSize)

sc.filterConf = &store.FilterConfig{}

cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Expand Down Expand Up @@ -338,6 +342,7 @@ func runStore(
store.WithChunkPool(chunkPool),
store.WithFilterConfig(conf.filterConf),
store.WithChunkHashCalculation(true),
store.WithSeriesBatchSize(conf.seriesBatchSize),
}

if conf.debugLogging {
Expand Down
90 changes: 60 additions & 30 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ const (
minBlockSyncConcurrency = 1

enableChunkHashCalculation = true

// The default batch size when fetching series from object storage.
defaultSeriesBatchSize = 10000
)

var (
Expand Down Expand Up @@ -309,6 +312,7 @@ type BucketStore struct {
indexReaderPool *indexheader.ReaderPool
buffers sync.Pool
chunkPool pool.Bytes
seriesBatchSize int

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
mtx sync.RWMutex
Expand Down Expand Up @@ -423,6 +427,12 @@ func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption
}
}

func WithSeriesBatchSize(seriesBatchSize int) BucketStoreOption {
return func(s *BucketStore) {
s.seriesBatchSize = seriesBatchSize
}
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand Down Expand Up @@ -464,6 +474,7 @@ func NewBucketStore(
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
enableChunkHashCalculation: enableChunkHashCalculation,
seriesBatchSize: defaultSeriesBatchSize,
}

for _, option := range options {
Expand Down Expand Up @@ -790,32 +801,29 @@ type seriesEntry struct {
chks []storepb.AggrChunk
}

const batchSize = 10000

// blockSeriesClient is a storepb.Store_SeriesClient for a
// single TSDB block in object storage.
type blockSeriesClient struct {
grpc.ClientStream
ctx context.Context
logger log.Logger
extLset labels.Labels
blockMatchers []*labels.Matcher
ctx context.Context
logger log.Logger
extLset labels.Labels

postings []storage.SeriesRef
i int
mint int64
maxt int64
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr
limiter ChunksLimiter
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

skipChunks bool
shardMatcher *storepb.ShardMatcher
calculateChunkHash bool

// Internal state
i int
postings []storage.SeriesRef
symbolizedLset []symbolizedLabel
entries []seriesEntry
batch []*storepb.SeriesResponse
Expand All @@ -831,23 +839,24 @@ func newBlockSeriesClient(
bytesLimiter BytesLimiter,
shardMatcher *storepb.ShardMatcher,
calculateChunkHash bool,
batchSize int,
) *blockSeriesClient {
var chunkr *bucketChunkReader
if !req.SkipChunks {
chunkr = b.chunkReader()
}

return &blockSeriesClient{
ctx: ctx,
logger: logger,
extLset: b.extLset,
mint: req.MinTime,
maxt: req.MaxTime,
indexr: b.indexReader(),
chunkr: chunkr,
limiter: limiter,
bytesLimiter: bytesLimiter,
skipChunks: req.SkipChunks,
ctx: ctx,
logger: logger,
extLset: b.extLset,
mint: req.MinTime,
maxt: req.MaxTime,
indexr: b.indexReader(),
chunkr: chunkr,
chunksLimiter: limiter,
bytesLimiter: bytesLimiter,
skipChunks: req.SkipChunks,

loadAggregates: req.Aggregates,
shardMatcher: shardMatcher,
Expand All @@ -866,7 +875,11 @@ func (b *blockSeriesClient) Close() {
}

func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats {
return stats.merge(b.indexr.stats)
stats = stats.merge(b.indexr.stats)
if !b.skipChunks {
stats = stats.merge(b.chunkr.stats)
}
return stats
}

func (b *blockSeriesClient) ExpandPostings(
Expand Down Expand Up @@ -912,7 +925,7 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) {

func (b *blockSeriesClient) nextBatch() error {
start := b.i
end := start + batchSize
end := start + defaultSeriesBatchSize
if end > len(b.postings) {
end = len(b.postings)
}
Expand Down Expand Up @@ -940,8 +953,7 @@ func (b *blockSeriesClient) nextBatch() error {
var chks []chunks.Meta
ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt)
if err != nil {
b.batch = append(b.batch, storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series")))
continue
return errors.Wrap(err, "read series")
}
if !ok {
continue
Expand Down Expand Up @@ -982,7 +994,7 @@ func (b *blockSeriesClient) nextBatch() error {
}

// Ensure sample limit through chunksLimiter if we return chunks.
if err := b.limiter.Reserve(uint64(len(chks))); err != nil {
if err := b.chunksLimiter.Reserve(uint64(len(chks))); err != nil {
return errors.Wrap(err, "exceeded chunks limit")
}

Expand Down Expand Up @@ -1149,7 +1161,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))
ctx = srv.Context()
stats = &queryStats{}
res []respSet
respSets []respSet
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
Expand Down Expand Up @@ -1202,6 +1214,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
bytesLimiter,
shardMatcher,
s.enableChunkHashCalculation,
s.seriesBatchSize,
)
defer blockClient.Close()

Expand Down Expand Up @@ -1236,7 +1249,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
)

mtx.Lock()
res = append(res, part)
respSets = append(respSets, part)
mtx.Unlock()

return nil
Expand Down Expand Up @@ -1288,18 +1301,32 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}
return status.Error(code, err.Error())
}
stats.blocksQueried = len(res)
stats.blocksQueried = len(respSets)
stats.GetAllDuration = time.Since(begin)
s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds())
s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried))
}

// Merge the sub-results from each selected block.
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
defer func() {
for _, resp := range respSets {
resp.Close()
}
}()
begin := time.Now()
set := NewProxyResponseHeap(res...)
set := NewDedupResponseHeap(NewProxyResponseHeap(respSets...))
for set.Next() {
at := set.At()
warn := at.GetWarning()
if warn != "" {
// TODO(fpetkovski): Consider deprecating string based warnings in favor of a
// separate protobuf message containing the grpc code and
// a human readable error message.
err = status.Error(storepb.GRPCCodeFromWarn(warn), at.GetWarning())
return
}

series := at.GetSeries()
if series != nil {
stats.mergedSeriesCount++
Expand All @@ -1318,6 +1345,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie

err = nil
})
if err != nil {
return err
}

if s.enableSeriesResponseHints {
var anyHints *types.Any
Expand Down Expand Up @@ -1429,7 +1459,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
MaxTime: req.End,
SkipChunks: true,
}
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true)
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, defaultSeriesBatchSize)

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
Expand Down Expand Up @@ -1604,7 +1634,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
MaxTime: req.End,
SkipChunks: true,
}
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true)
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, defaultSeriesBatchSize)

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2444,7 +2444,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
// must be called only from the goroutine running the Benchmark function.
testutil.Ok(b, err)

blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false)
blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, defaultSeriesBatchSize)
testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter, dummyCounter))
defer blockClient.Close()

Expand Down
11 changes: 11 additions & 0 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"

"github.com/thanos-io/thanos/pkg/store/labelpb"
)
Expand Down Expand Up @@ -51,6 +52,16 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
}
}

func GRPCCodeFromWarn(warn string) codes.Code {
if strings.Contains(warn, "rpc error: code = ResourceExhausted") {
return codes.ResourceExhausted
}
if strings.Contains(warn, "rpc error: code = Code(422)") {
return 422
}
return codes.Unknown
}

type emptySeriesSet struct{}

func (emptySeriesSet) Next() bool { return false }
Expand Down

0 comments on commit 184f695

Please sign in to comment.