Skip to content

Commit

Permalink
Ensured index cache is best effort, refactored tests, validated edge …
Browse files Browse the repository at this point in the history
…cases.

Fixes #651

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Apr 23, 2019
1 parent da70cb0 commit 828a81e
Show file tree
Hide file tree
Showing 6 changed files with 622 additions and 316 deletions.
29 changes: 19 additions & 10 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/pool"
"github.com/improbable-eng/thanos/pkg/runutil"
storecache "github.com/improbable-eng/thanos/pkg/store/cache"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/strutil"
"github.com/improbable-eng/thanos/pkg/tracing"
Expand Down Expand Up @@ -182,7 +183,7 @@ type BucketStore struct {
metrics *bucketStoreMetrics
bucket objstore.BucketReader
dir string
indexCache *indexCache
indexCache *storecache.IndexCache
chunkPool *pool.BytesPool

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
Expand Down Expand Up @@ -225,10 +226,18 @@ func NewBucketStore(
return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent)
}

indexCache, err := newIndexCache(reg, indexCacheSizeBytes)
// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
SetTimeout: 300 * time.Millisecond, // TODO(bwplotka): Add as a flag?
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return nil, errors.Wrap(err, "create index cache")
}

chunkPool, err := pool.NewBytesPool(2e5, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, errors.Wrap(err, "create chunk pool")
Expand Down Expand Up @@ -1058,7 +1067,7 @@ type bucketBlock struct {
bucket objstore.BucketReader
meta *metadata.Meta
dir string
indexCache *indexCache
indexCache *storecache.IndexCache
chunkPool *pool.BytesPool

indexVersion int
Expand All @@ -1081,7 +1090,7 @@ func newBucketBlock(
bkt objstore.BucketReader,
id ulid.ULID,
dir string,
indexCache *indexCache,
indexCache *storecache.IndexCache,
chunkPool *pool.BytesPool,
p partitioner,
) (b *bucketBlock, err error) {
Expand Down Expand Up @@ -1241,13 +1250,13 @@ type bucketIndexReader struct {
block *bucketBlock
dec *index.Decoder
stats *queryStats
cache *indexCache
cache *storecache.IndexCache

mtx sync.Mutex
loadedSeries map[uint64][]byte
}

func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache *indexCache) *bucketIndexReader {
func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache *storecache.IndexCache) *bucketIndexReader {
r := &bucketIndexReader{
logger: logger,
ctx: ctx,
Expand Down Expand Up @@ -1415,7 +1424,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
for i, g := range groups {
for j, key := range g.keys {
// Get postings for the given key from cache first.
if b, ok := r.cache.postings(r.block.meta.ULID, key); ok {
if b, ok := r.cache.Postings(r.block.meta.ULID, key); ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

Expand Down Expand Up @@ -1487,7 +1496,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {

// Return postings and fill LRU cache.
groups[p.groupID].Fill(p.keyID, fetchedPostings)
r.cache.setPostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)
r.cache.SetPostings(ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand All @@ -1510,7 +1519,7 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
var newIDs []uint64

for _, id := range ids {
if b, ok := r.cache.series(r.block.meta.ULID, id); ok {
if b, ok := r.cache.Series(r.block.meta.ULID, id); ok {
r.loadedSeries[id] = b
continue
}
Expand Down Expand Up @@ -1567,7 +1576,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
}
c = c[n : n+int(l)]
r.loadedSeries[id] = c
r.cache.setSeries(r.block.meta.ULID, id, c)
r.cache.SetSeries(ctx, r.block.meta.ULID, id, c)
}
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/objtesting"
"github.com/improbable-eng/thanos/pkg/runutil"
storecache "github.com/improbable-eng/thanos/pkg/store/cache"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
Expand Down Expand Up @@ -310,7 +311,10 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
t.Log("Run ", i)

// Always clean cache before each test.
s.store.indexCache, err = newIndexCache(nil, 100)
s.store.indexCache, err = storecache.NewIndexCache(log.NewNopLogger(), nil, storecache.Opts{
MaxSizeBytes: 100,
MaxItemSizeBytes: 100,
})
testutil.Ok(t, err)

srv := newStoreSeriesServer(ctx)
Expand Down
216 changes: 0 additions & 216 deletions pkg/store/cache.go

This file was deleted.

Loading

0 comments on commit 828a81e

Please sign in to comment.