Skip to content

Commit

Permalink
Store: make index cache shared cache friendly (thanos-io#1825)
Browse files Browse the repository at this point in the history
* Renamed store cache.go to inmemory.go

Signed-off-by: Marco Pracucci <[email protected]>

* Refactored IndexCache to multiple fetch postings and series within a single call

Signed-off-by: Marco Pracucci <[email protected]>

* Added missing comments to exported functions from InMemoryIndexCache

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored and bwplotka committed Dec 4, 2019
1 parent 2190d58 commit c134ad1
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 377 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func runStore(
// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
indexCache, err := storecache.NewInMemoryIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
Expand Down
48 changes: 23 additions & 25 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -188,13 +189,6 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
return &m
}

type indexCache interface {
SetPostings(b ulid.ULID, l labels.Label, v []byte)
Postings(b ulid.ULID, l labels.Label) ([]byte, bool)
SetSeries(b ulid.ULID, id uint64, v []byte)
Series(b ulid.ULID, id uint64) ([]byte, bool)
}

// FilterConfig is a configuration, which Store uses for filtering metrics.
type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
Expand All @@ -207,7 +201,7 @@ type BucketStore struct {
metrics *bucketStoreMetrics
bucket objstore.BucketReader
dir string
indexCache indexCache
indexCache storecache.IndexCache
chunkPool *pool.BytesPool

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
Expand Down Expand Up @@ -241,7 +235,7 @@ func NewBucketStore(
reg prometheus.Registerer,
bucket objstore.BucketReader,
dir string,
indexCache indexCache,
indexCache storecache.IndexCache,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
Expand Down Expand Up @@ -1175,7 +1169,7 @@ type bucketBlock struct {
bucket objstore.BucketReader
meta *metadata.Meta
dir string
indexCache indexCache
indexCache storecache.IndexCache
chunkPool *pool.BytesPool

indexVersion int
Expand All @@ -1196,7 +1190,7 @@ func newBucketBlock(
meta *metadata.Meta,
bkt objstore.BucketReader,
dir string,
indexCache indexCache,
indexCache storecache.IndexCache,
chunkPool *pool.BytesPool,
p partitioner,
) (b *bucketBlock, err error) {
Expand Down Expand Up @@ -1358,13 +1352,13 @@ type bucketIndexReader struct {
block *bucketBlock
dec *index.Decoder
stats *queryStats
cache indexCache
cache storecache.IndexCache

mtx sync.Mutex
loadedSeries map[uint64][]byte
}

func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache indexCache) *bucketIndexReader {
func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache storecache.IndexCache) *bucketIndexReader {
r := &bucketIndexReader{
logger: logger,
ctx: ctx,
Expand Down Expand Up @@ -1527,13 +1521,21 @@ type postingPtr struct {
func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
var ptrs []postingPtr

// Fetch postings from the cache with a single call.
keys := make([]labels.Label, 0)
for _, g := range groups {
keys = append(keys, g.keys...)
}

fromCache, _ := r.cache.FetchMultiPostings(r.block.meta.ULID, keys)

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
// Overlaps are well handled by partitioner, so we don't need to deduplicate keys.
for i, g := range groups {
for j, key := range g.keys {
// Get postings for the given key from cache first.
if b, ok := r.cache.Postings(r.block.meta.ULID, key); ok {
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

Expand Down Expand Up @@ -1604,7 +1606,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {

// Return postings and fill LRU cache.
groups[p.groupID].Fill(p.keyID, fetchedPostings)
r.cache.SetPostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)
r.cache.StorePostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand All @@ -1620,16 +1622,12 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
const maxSeriesSize = 64 * 1024

var newIDs []uint64

for _, id := range ids {
if b, ok := r.cache.Series(r.block.meta.ULID, id); ok {
r.loadedSeries[id] = b
continue
}
newIDs = append(newIDs, id)
// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
fromCache, ids := r.cache.FetchMultiSeries(r.block.meta.ULID, ids)
for id, b := range fromCache {
r.loadedSeries[id] = b
}
ids = newIDs

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
return ids[i], ids[i] + maxSeriesSize
Expand Down Expand Up @@ -1674,7 +1672,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
}
c = c[n : n+int(l)]
r.loadedSeries[id] = c
r.cache.SetSeries(r.block.meta.ULID, id, c)
r.cache.StoreSeries(r.block.meta.ULID, id, c)
}
return nil
}
Expand Down
39 changes: 22 additions & 17 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,38 @@ var (

type noopCache struct{}

func (noopCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {}
func (noopCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) { return nil, false }
func (noopCache) SetSeries(b ulid.ULID, id uint64, v []byte) {}
func (noopCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { return nil, false }
func (noopCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {}
func (noopCache) FetchMultiPostings(blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreSeries(blockID ulid.ULID, id uint64, v []byte) {}
func (noopCache) FetchMultiSeries(blockID ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) {
return map[uint64][]byte{}, ids
}

type swappableCache struct {
ptr indexCache
ptr storecache.IndexCache
}

func (c *swappableCache) SwapWith(ptr2 indexCache) {
func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) {
c.ptr = ptr2
}

func (c *swappableCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {
c.ptr.SetPostings(b, l, v)
func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
c.ptr.StorePostings(blockID, l, v)
}

func (c *swappableCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) {
return c.ptr.Postings(b, l)
func (c *swappableCache) FetchMultiPostings(blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return c.ptr.FetchMultiPostings(blockID, keys)
}

func (c *swappableCache) SetSeries(b ulid.ULID, id uint64, v []byte) {
c.ptr.SetSeries(b, id, v)
func (c *swappableCache) StoreSeries(blockID ulid.ULID, id uint64, v []byte) {
c.ptr.StoreSeries(blockID, id, v)
}

func (c *swappableCache) Series(b ulid.ULID, id uint64) ([]byte, bool) {
return c.ptr.Series(b, id)
func (c *swappableCache) FetchMultiSeries(blockID ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) {
return c.ptr.FetchMultiSeries(blockID, ids)
}

type storeSuite struct {
Expand Down Expand Up @@ -373,7 +378,7 @@ func TestBucketStore_e2e(t *testing.T) {
testBucketStore_e2e(t, ctx, s)

t.Log("Test with large, sufficient index cache")
indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
indexCache, err := storecache.NewInMemoryIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 1e5,
MaxSizeBytes: 2e5,
})
Expand All @@ -382,7 +387,7 @@ func TestBucketStore_e2e(t *testing.T) {
testBucketStore_e2e(t, ctx, s)

t.Log("Test with small index cache")
indexCache2, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
indexCache2, err := storecache.NewInMemoryIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 50,
MaxSizeBytes: 100,
})
Expand Down Expand Up @@ -416,7 +421,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {

s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig)

indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
indexCache, err := storecache.NewInMemoryIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 1e5,
MaxSizeBytes: 2e5,
})
Expand Down
Loading

0 comments on commit c134ad1

Please sign in to comment.