Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Fix series cache growth
Browse files Browse the repository at this point in the history
Approach is using timestamps to have better understanding on what elements
are getting evicted. If we see an eviction of recently used/inserted
element (defined with evictionMaxAge) we will attempt to grow the cache.
`evictionMaxAge` defines max age where we allow evictions and don't grow the cache.
This fixes the problem with series cache growth.
Main idea here is not to allow evictions of recently used elements but to try to
grow cache instead.
  • Loading branch information
niksajakovljevic committed Oct 24, 2022
1 parent 08abf8b commit 8578fe0
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 39 deletions.
87 changes: 68 additions & 19 deletions pkg/clockcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Cache struct {
dataSize uint64
// number of evictions
evictions uint64
// max eviction timestamp in unix seconds
maxEvictionTs int32

// guards next, and len(storage) and ensures that at most one eviction
// occurs at a time, always grabbed _before_ elementsLock
Expand All @@ -40,9 +42,13 @@ type element struct {
key interface{}
value interface{}

// CLOCK marker if this is recently used
used uint32
size uint64
// CLOCK marker if this element is recently used together with timestamp
// To avoid memory waste we pack together `used` flag with timestamp into 32 bits
// Left most bit represents if element was used (1 or 0). The rest (31 bits) represent timestamp
// Timestamp is helping us figure out if we should attempt to grow cache or not
// Timestamp is stored in Unix seconds (31 bits should be enough for the next 15 years)
usedWithTs uint32
size uint64

// pad Elements out to be cache-aligned, see BenchmarkCacheFalseSharing
_ [16]byte
Expand Down Expand Up @@ -100,29 +106,34 @@ func (self *Cache) insert(key interface{}, value interface{}, size uint64) (exis
elem, present := self.elements[key]
if present {
// we'll count a double-insert as a hit. See the comment in get
if atomic.LoadUint32(&elem.used) != 0 {
atomic.StoreUint32(&elem.used, 1)
if !extractUsed(atomic.LoadUint32(&elem.usedWithTs)) {
atomic.StoreUint32(&elem.usedWithTs, packUsedAndTimestamp(true, int32(time.Now().Unix())))
}
return elem, false, true
}

var insertLocation *element
newElement := element{key: key, value: value, size: size, usedWithTs: packUsedAndTimestamp(false, int32(time.Now().Unix()))}
if len(self.storage) >= cap(self.storage) {
insertLocation = self.evict()
if insertLocation == nil {
return &element{key: key, value: value, size: size}, false, false
return &newElement, false, false
}
self.elementsLock.Lock()
defer self.elementsLock.Unlock()
delete(self.elements, insertLocation.key)
self.dataSize -= insertLocation.size
self.dataSize += size
self.evictions++
*insertLocation = element{key: key, value: value, size: size}
evictionTs := extractTimestamp(insertLocation.usedWithTs)
if self.maxEvictionTs < evictionTs {
self.maxEvictionTs = evictionTs
}
*insertLocation = newElement
} else {
self.elementsLock.Lock()
defer self.elementsLock.Unlock()
self.storage = append(self.storage, element{key: key, value: value, size: size})
self.storage = append(self.storage, newElement)
self.dataSize += size
insertLocation = &self.storage[len(self.storage)-1]
}
Expand Down Expand Up @@ -168,8 +179,8 @@ func (self *Cache) evict() (insertPtr *element) {
for i := 0; i < 2; i++ {
for next := range postStart {
elem := &postStart[next]
old := atomic.SwapUint32(&elem.used, 0)
if old == 0 {
old := atomic.SwapUint32(&elem.usedWithTs, setUsed(false, elem.usedWithTs))
if !extractUsed(old) {
insertPtr = elem
}

Expand All @@ -180,8 +191,8 @@ func (self *Cache) evict() (insertPtr *element) {
}
for next := range preStart {
elem := &preStart[next]
old := atomic.SwapUint32(&elem.used, 0)
if old == 0 {
old := atomic.SwapUint32(&elem.usedWithTs, setUsed(false, elem.usedWithTs))
if !extractUsed(old) {
insertPtr = elem
}

Expand Down Expand Up @@ -252,8 +263,8 @@ func (self *Cache) get(key interface{}) (interface{}, bool) {
// this is a read-only operation, and doesn't trash the cache line that used
// is stored on. The lack of atomicity of the update doesn't matter for our
// use case.
if atomic.LoadUint32(&elem.used) == 0 {
atomic.StoreUint32(&elem.used, 1)
if !extractUsed(atomic.LoadUint32(&elem.usedWithTs)) {
atomic.StoreUint32(&elem.usedWithTs, packUsedAndTimestamp(true, int32(time.Now().Unix())))
}
self.metrics.Inc(self.metrics.hitsTotal)

Expand All @@ -274,8 +285,8 @@ func (self *Cache) unmark(key string) bool {
// this is a read-only operation, and doesn't trash the cache line that used
// is stored on. The lack of atomicity of the update doesn't matter for our
// use case.
if atomic.LoadUint32(&elem.used) != 0 {
atomic.StoreUint32(&elem.used, 0)
if extractUsed(atomic.LoadUint32(&elem.usedWithTs)) {
atomic.StoreUint32(&elem.usedWithTs, setUsed(false, elem.usedWithTs))
}

return true
Expand All @@ -296,9 +307,9 @@ func (self *Cache) ExpandTo(newMax int) {
for i := range self.storage {
elem := &self.storage[i]
newStorage = append(newStorage, element{
key: elem.key,
value: elem.value,
used: atomic.LoadUint32(&elem.used),
key: elem.key,
value: elem.value,
usedWithTs: atomic.LoadUint32(&elem.usedWithTs),
})
}

Expand All @@ -313,6 +324,8 @@ func (self *Cache) ExpandTo(newMax int) {

self.elements = newElements
self.storage = newStorage
self.maxEvictionTs = 0
self.evictions = 0
}

func (self *Cache) Reset() {
Expand All @@ -328,6 +341,8 @@ func (self *Cache) Reset() {
self.elements = newElements
self.storage = newStorage
self.next = 0
self.maxEvictionTs = 0
self.evictions = 0
}

func (self *Cache) Len() int {
Expand All @@ -342,6 +357,12 @@ func (self *Cache) Evictions() uint64 {
return self.evictions
}

func (self *Cache) MaxEvictionTs() int32 {
self.elementsLock.RLock()
defer self.elementsLock.RUnlock()
return self.maxEvictionTs
}

func (self *Cache) SizeBytes() uint64 {
self.elementsLock.RLock()
defer self.elementsLock.RUnlock()
Expand All @@ -366,3 +387,31 @@ func (self *Cache) debugString() string {
}
return fmt.Sprintf("%s]", str)
}

// extractTimestamp skips leftmost bit and converts the rest into timestamp (unix seconds)
func extractTimestamp(in uint32) int32 {
return int32(in & ((1 << 31) - 1))
}

// extractUsed gets the first bit from the left
// checking if element has been used
func extractUsed(in uint32) bool {
return 1 == (in >> 31)
}

// setUsed sets first bit from the left. This bit marks if element has been used
func setUsed(used bool, in uint32) uint32 {
if used {
return in | uint32(1<<31)
}
return in &^ uint32(1<<31)
}

// packUsedAndTimestamp packs bool flag and timestamp into 32 bits
func packUsedAndTimestamp(used bool, ts int32) uint32 {
var usedBit int
if used {
usedBit = 1
}
return uint32(usedBit<<31) | uint32(ts)
}
4 changes: 3 additions & 1 deletion pkg/clockcache/cache_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"sync"
"testing"
"time"
)

// microbenchmark. measure the length of the Insert critical section
Expand Down Expand Up @@ -73,8 +74,9 @@ func BenchmarkEviction(b *testing.B) {
}

func (c *Cache) markAll() {
now := int32(time.Now().Unix())
for i := range c.storage {
c.storage[i].used = 2
c.storage[i].usedWithTs = packUsedAndTimestamp(true, now)
}
}

Expand Down
23 changes: 22 additions & 1 deletion pkg/clockcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package clockcache

import (
"fmt"
"github.com/stretchr/testify/require"
"math/rand"
"reflect"
"sync"
"testing"
"time"
"unsafe"

"github.com/stretchr/testify/require"
)

func TestWriteAndGetOnCache(t *testing.T) {
Expand Down Expand Up @@ -215,7 +217,12 @@ func TestExpand(t *testing.T) {
t.Errorf("Unexpected size %v", cache.SizeBytes())
}

require.Equal(t, int32(0), cache.MaxEvictionTs(), "unexpected eviction")

cache.Insert(4, 4, 20)
if cache.maxEvictionTs == 0 {
t.Errorf("Eviction timestamp missing")
}
expected = "[1: 1, 4: 4, 3: 3, ]"
if cache.debugString() != expected {
t.Errorf("unexpected cache\nexpected\n\t%s\nfound\n\t%s\n", expected, cache.debugString())
Expand All @@ -226,6 +233,7 @@ func TestExpand(t *testing.T) {
}

cache.ExpandTo(5)
require.Equal(t, int32(0), cache.MaxEvictionTs(), "max eviction timestamp not reset")
expected = "[1: 1, 4: 4, 3: 3, ]"
if cache.debugString() != expected {
t.Errorf("unexpected cache\nexpected\n\t%s\nfound\n\t%s\n", expected, cache.debugString())
Expand Down Expand Up @@ -353,3 +361,16 @@ func TestCacheEvictionOnWraparound(t *testing.T) {
assertCacheContains(cache, cachedItems)
}
}

func TestBitPacking(t *testing.T) {
timestamp := int32(time.Date(2020, time.January, 1, 1, 0, 0, 0, time.Local).UTC().Unix())
used := true
packed := packUsedAndTimestamp(used, int32(timestamp))
usedExt := extractUsed(packed)
require.Equal(t, used, usedExt, "used flag not properly extracted")
timestampExt := extractTimestamp(packed)
require.Equal(t, timestamp, timestampExt, "timestamp not properly extracted")
packed = setUsed(false, packed)
usedExt = extractUsed(packed)
require.Equal(t, false, usedExt, "used flag not properly set")
}
33 changes: 17 additions & 16 deletions pkg/pgmodel/cache/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
//this seems like a good initial size for /active/ series. Takes about 32MB
const DefaultSeriesCacheSize = 250000

const GrowCheckDuration = time.Minute // check whether to grow the series cache this often
const GrowEvictionThreshold = 0.2 // grow when evictions more than 20% of cache size
const GrowFactor = float64(2.0) // multiply cache size by this factor when growing the cache
const growCheckDuration = time.Second * 5 // check whether to grow the series cache this often
const growFactor = float64(2.0) // multiply cache size by this factor when growing the cache
var evictionMaxAge = time.Minute * 2 // grow cache if we are evicting elements younger than `now - evictionMaxAge`

// SeriesCache is a cache of model.Series entries.
type SeriesCache interface {
Expand Down Expand Up @@ -55,36 +55,37 @@ func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl {
}

func (t *SeriesCacheImpl) runSizeCheck(sigClose <-chan struct{}) {
prev := uint64(0)
ticker := time.NewTicker(GrowCheckDuration)
ticker := time.NewTicker(growCheckDuration)
for {
select {
case <-ticker.C:
current := t.cache.Evictions()
newEvictions := current - prev
evictionsThresh := uint64(float64(t.Len()) * GrowEvictionThreshold)
prev = current
if newEvictions > evictionsThresh {
t.grow(newEvictions)
if t.shouldGrow() {
t.grow()
}
case <-sigClose:
return
}
}
}

func (t *SeriesCacheImpl) grow(newEvictions uint64) {
// shouldGrow allows cache growth if we are evicting elements that were recently used or inserted
// evictionMaxAge defines the interval
func (t *SeriesCacheImpl) shouldGrow() bool {
return t.cache.MaxEvictionTs()+int32(evictionMaxAge.Seconds()) > int32(time.Now().Unix())
}

func (t *SeriesCacheImpl) grow() {
sizeBytes := t.cache.SizeBytes()
oldSize := t.cache.Cap()
if float64(sizeBytes)*1.2 >= float64(t.maxSizeBytes) {
log.Warn("msg", "Series cache is too small and cannot be grown",
"current_size_bytes", float64(sizeBytes), "max_size_bytes", float64(t.maxSizeBytes),
"current_size_elements", oldSize, "check_interval", GrowCheckDuration,
"new_evictions", newEvictions, "new_evictions_percent", 100*(float64(newEvictions)/float64(oldSize)))
"current_size_elements", oldSize, "check_interval", growCheckDuration,
"eviction_max_age", evictionMaxAge)
return
}

multiplier := GrowFactor
multiplier := growFactor
if float64(sizeBytes)*multiplier >= float64(t.maxSizeBytes) {
multiplier = float64(t.maxSizeBytes) / float64(sizeBytes)
}
Expand All @@ -97,7 +98,7 @@ func (t *SeriesCacheImpl) grow(newEvictions uint64) {
"new_size_elements", newNumElements, "current_size_elements", oldSize,
"new_size_bytes", float64(sizeBytes)*multiplier, "max_size_bytes", float64(t.maxSizeBytes),
"multiplier", multiplier,
"new_evictions", newEvictions, "new_evictions_percent", 100*(float64(newEvictions)/float64(oldSize)))
"eviction_max_age", evictionMaxAge)
t.cache.ExpandTo(newNumElements)
}

Expand Down
46 changes: 44 additions & 2 deletions pkg/pgmodel/cache/series_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package cache

import (
"bytes"
"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/prompb"
"math"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/prompb"

promLabels "github.com/prometheus/prometheus/model/labels"
)
Expand Down Expand Up @@ -52,3 +54,43 @@ func TestGenerateKey(t *testing.T) {
require.Equal(t, "test", metricName)
require.Equal(t, []byte("\x08\x00__name__\x04\x00test\x04\x00hell\x06\x00oworld\x05\x00hello\x05\x00world"), keyBuffer.Bytes())
}

func TestGrowSeriesCache(t *testing.T) {
testCases := []struct {
name string
sleep time.Duration
cacheGrowCnt int
}{
{
name: "Grow criteria satisfied - we shoulnd't be evicting elements",
sleep: time.Millisecond,
cacheGrowCnt: 1,
},
{
name: "Growth criteria not satisfied - we can keep evicting old elements",
sleep: time.Second * 2,
cacheGrowCnt: 0,
},
}

t.Setenv("IS_TEST", "true")
evictionMaxAge = time.Second // tweaking it to not wait too long
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cache := NewSeriesCache(Config{SeriesCacheInitialSize: 100, SeriesCacheMemoryMaxBytes: DefaultConfig.SeriesCacheMemoryMaxBytes}, nil)
cacheGrowCounter := 0
for i := 0; i < 200; i++ {
cache.cache.Insert(i, i, 1)
if i%100 == 0 {
time.Sleep(tc.sleep)
}
if cache.shouldGrow() {
cache.grow()
cacheGrowCounter++
}
}
require.Equal(t, tc.cacheGrowCnt, cacheGrowCounter, "series cache unexpectedly grow")
})
}

}

0 comments on commit 8578fe0

Please sign in to comment.