Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LRU cache ensureFits() tweaks WRT deadlocking #796

Merged
merged 5 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pkg/store/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type indexCache struct {
added *prometheus.CounterVec
current *prometheus.GaugeVec
currentSize *prometheus.GaugeVec
overflow *prometheus.CounterVec
}

// newIndexCache creates a new LRU cache for index entries and ensures the total cache
Expand All @@ -66,6 +67,11 @@ func newIndexCache(reg prometheus.Registerer, maxBytes uint64) (*indexCache, err
Help: "Total number of requests to the cache.",
}, []string{"item_type"})

c.overflow = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "thanos_store_index_cache_items_overflowed_total",
Help: "Total number of items that could not be added to the cache due to being too big.",
}, []string{"item_type"})

c.hits = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "thanos_store_index_cache_hits_total",
Help: "Total number of requests to the cache that were a hit.",
Expand Down Expand Up @@ -115,10 +121,16 @@ func newIndexCache(reg prometheus.Registerer, maxBytes uint64) (*indexCache, err
return c, nil
}

func (c *indexCache) ensureFits(b []byte) {
// ensureFits tries to make sure that the passed slice will fit into the LRU cache.
// Returns true if it will fit.
func (c *indexCache) ensureFits(b []byte) bool {
if uint64(len(b)) > c.maxSize {
return false
}
for c.curSize+uint64(len(b)) > c.maxSize {
c.lru.RemoveOldest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could do more aggressive removal if the slice is big enough.

Copy link
Member Author

@GiedriusS GiedriusS Feb 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but how the implementation would be like? I have two ideas:

  • If the slice size exceeds a certain threshold then we could remove 2 or more items at a time
  • Get all keys, calculate their sizes, and remove the top X items to make sure there's enough size for the item b?

Either way, the scope would definitely blow up since I do not think it is worth making these kinds of changes without scrupulous benchmarking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way, maybe lets leave it for another day? I am mainly interested now in making sure that the loop doesn't go forever

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

}
return true
}

func (c *indexCache) setPostings(b ulid.ULID, l labels.Label, v []byte) {
Expand All @@ -127,7 +139,10 @@ func (c *indexCache) setPostings(b ulid.ULID, l labels.Label, v []byte) {
c.mtx.Lock()
defer c.mtx.Unlock()

c.ensureFits(v)
if !c.ensureFits(v) {
c.overflow.WithLabelValues(cacheTypePostings).Inc()
return
}

// The caller may be passing in a sub-slice of a huge array. Copy the data
// to ensure we don't waste huge amounts of space for something small.
Expand Down Expand Up @@ -158,7 +173,10 @@ func (c *indexCache) setSeries(b ulid.ULID, id uint64, v []byte) {
c.mtx.Lock()
defer c.mtx.Unlock()

c.ensureFits(v)
if !c.ensureFits(v) {
c.overflow.WithLabelValues(cacheTypeSeries).Inc()
return
}

// The caller may be passing in a sub-slice of a huge array. Copy the data
// to ensure we don't waste huge amounts of space for something small.
Expand Down
22 changes: 22 additions & 0 deletions pkg/store/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Tests out the index cache implementation.
package store

import (
"testing"

"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/prometheus/client_golang/prometheus"
)

// TestIndexCacheEdge tests the index cache edge cases.
func TestIndexCacheEdge(t *testing.T) {
metrics := prometheus.NewRegistry()
cache, err := newIndexCache(metrics, 1)
testutil.Ok(t, err)

fits := cache.ensureFits([]byte{42, 24})
testutil.Equals(t, fits, false)

fits = cache.ensureFits([]byte{42})
testutil.Equals(t, fits, true)
}