Skip to content

Commit

Permalink
Refactor consistency check so it can be reused.
Browse files Browse the repository at this point in the history
This is in anticipation of thanos-io/thanos#3469 where we would need to
utilise the consistency check to query for labels as well.

Signed-off-by: Goutham Veeramachaneni <[email protected]>
  • Loading branch information
gouthamve committed Nov 19, 2020
1 parent 46bdb1f commit 5f29f37
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 37 deletions.
103 changes: 67 additions & 36 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,55 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
minT, maxT = sp.Start, sp.End
}

var (
convertedMatchers = convertMatchersToLabelMatcher(matchers)
resSeriesSets = []storage.SeriesSet(nil)
resWarnings = storage.Warnings(nil)

maxChunksLimit = q.limits.MaxChunksPerQuery(q.userID)
leftChunksLimit = maxChunksLimit

resultMtx sync.Mutex
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) {
seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit)
if err != nil {
return nil, err
}

resultMtx.Lock()

resSeriesSets = append(resSeriesSets, seriesSets...)
resWarnings = append(resWarnings, warnings...)

// Given a single block is guaranteed to not be queried twice, we can safely decrease the number of
// chunks we can still read before hitting the limit (max == 0 means disabled).
if maxChunksLimit > 0 {
leftChunksLimit -= numChunks
}

resultMtx.Unlock()

return queriedBlocks, nil
}

err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc)
if err != nil {
return storage.ErrSeriesSet(err)
}

if len(resSeriesSets) == 0 {
storage.EmptySeriesSet()
}

return series.NewSeriesSetWithWarnings(
storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge),
resWarnings)
}

func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64,
queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error)) error {
// If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until
// now - queryStoreAfter, because the most recent time range is covered by ingesters. This
// optimization is particularly important for the blocks storage because can be used to skip
Expand All @@ -325,43 +374,37 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
maxT = util.Min64(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter)))

if origMaxT != maxT {
level.Debug(spanLog).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT)
level.Debug(logger).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT)
}

if maxT < minT {
q.metrics.storesHit.Observe(0)
level.Debug(spanLog).Log("msg", "empty query time range after max time manipulation")
return storage.EmptySeriesSet()
level.Debug(logger).Log("msg", "empty query time range after max time manipulation")
return nil
}
}

// Find the list of blocks we need to query given the time range.
knownMetas, knownDeletionMarks, err := q.finder.GetBlocks(q.userID, minT, maxT)
if err != nil {
return storage.ErrSeriesSet(err)
return err
}

if len(knownMetas) == 0 {
q.metrics.storesHit.Observe(0)
level.Debug(spanLog).Log("msg", "no blocks found")
return storage.EmptySeriesSet()
level.Debug(logger).Log("msg", "no blocks found")
return nil
}

level.Debug(spanLog).Log("msg", "found blocks to query", "expected", BlockMetas(knownMetas).String())
level.Debug(logger).Log("msg", "found blocks to query", "expected", BlockMetas(knownMetas).String())

var (
// At the beginning the list of blocks to query are all known blocks.
remainingBlocks = getULIDsFromBlockMetas(knownMetas)
attemptedBlocks = map[ulid.ULID][]string{}
touchedStores = map[string]struct{}{}

convertedMatchers = convertMatchersToLabelMatcher(matchers)
resSeriesSets = []storage.SeriesSet(nil)
resWarnings = storage.Warnings(nil)
resQueriedBlocks = []ulid.ULID(nil)

maxChunksLimit = q.limits.MaxChunksPerQuery(q.userID)
leftChunksLimit = maxChunksLimit
resQueriedBlocks = []ulid.ULID(nil)
)

for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ {
Expand All @@ -372,32 +415,24 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
// If it's a retry and we get an error, it means there are no more store-gateways left
// from which running another attempt, so we're just stopping retrying.
if attempt > 1 {
level.Warn(spanLog).Log("msg", "unable to get store-gateway clients while retrying to fetch missing blocks", "err", err)
level.Warn(logger).Log("msg", "unable to get store-gateway clients while retrying to fetch missing blocks", "err", err)
break
}

return storage.ErrSeriesSet(err)
return err
}
level.Debug(spanLog).Log("msg", "found store-gateway instances to query", "num instances", len(clients), "attempt", attempt)
level.Debug(logger).Log("msg", "found store-gateway instances to query", "num instances", len(clients), "attempt", attempt)

// Fetch series from stores. If an error occur we do not retry because retries
// are only meant to cover missing blocks.
seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit)
queriedBlocks, err := queryFunc(clients, minT, maxT)
if err != nil {
return storage.ErrSeriesSet(err)
return err
}
level.Debug(spanLog).Log("msg", "received series from all store-gateways", "queried blocks", strings.Join(convertULIDsToString(queriedBlocks), " "))
level.Debug(logger).Log("msg", "received series from all store-gateways", "queried blocks", strings.Join(convertULIDsToString(queriedBlocks), " "))

resSeriesSets = append(resSeriesSets, seriesSets...)
resWarnings = append(resWarnings, warnings...)
resQueriedBlocks = append(resQueriedBlocks, queriedBlocks...)

// Given a single block is guaranteed to not be queried twice, we can safely decrease the number of
// chunks we can still read before hitting the limit (max == 0 means disabled).
if maxChunksLimit > 0 {
leftChunksLimit -= numChunks
}

// Update the map of blocks we attempted to query.
for client, blockIDs := range clients {
touchedStores[client.RemoteAddress()] = struct{}{}
Expand All @@ -413,22 +448,18 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
q.metrics.storesHit.Observe(float64(len(touchedStores)))
q.metrics.refetches.Observe(float64(attempt - 1))

return series.NewSeriesSetWithWarnings(
storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge),
resWarnings)
return nil
}

level.Debug(spanLog).Log("msg", "consistency check failed", "attempt", attempt, "missing blocks", strings.Join(convertULIDsToString(missingBlocks), " "))
level.Debug(logger).Log("msg", "consistency check failed", "attempt", attempt, "missing blocks", strings.Join(convertULIDsToString(missingBlocks), " "))

// The next attempt should just query the missing blocks.
remainingBlocks = missingBlocks
}

// We've not been able to query all expected blocks after all retries.
err = fmt.Errorf("consistency check failed because some blocks were not queried: %s", strings.Join(convertULIDsToString(remainingBlocks), " "))
level.Warn(util.WithContext(spanCtx, spanLog)).Log("msg", "failed consistency check", "err", err)

return storage.ErrSeriesSet(err)
level.Warn(util.WithContext(ctx, logger)).Log("msg", "failed consistency check", "err", err)
return fmt.Errorf("consistency check failed because some blocks were not queried: %s", strings.Join(convertULIDsToString(remainingBlocks), " "))
}

func (q *blocksStoreQuerier) fetchSeriesFromStores(
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T)
End: testData.queryMaxT,
}

set := q.selectSorted(sp, nil)
set := q.selectSorted(sp)
require.NoError(t, set.Err())

if testData.expectedMinT == 0 && testData.expectedMaxT == 0 {
Expand Down

0 comments on commit 5f29f37

Please sign in to comment.