Skip to content

Commit

Permalink
Tiered index cache with memcache as an option (grafana#982)
Browse files Browse the repository at this point in the history
* Make Cache a generic interface which FiFo also implements

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Make index cache use the generic interface

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Add memcache as a cache option to the Index

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Handle unicode with hack. Memcache cannot deal with it.

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Do client side expiry validation as write back might skew times

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Add metrics

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* protobuf and review feedback

Signed-off-by: Goutham Veeramachaneni <[email protected]>
  • Loading branch information
gouthamve authored Sep 10, 2018
1 parent 513b510 commit 13d488b
Show file tree
Hide file tree
Showing 21 changed files with 1,092 additions and 104 deletions.
13 changes: 13 additions & 0 deletions aws/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

awscommon "github.com/weaveworks/common/aws"
"github.com/weaveworks/common/instrument"
Expand Down Expand Up @@ -159,6 +160,18 @@ type storageClient struct {
batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest
}

// Opts returns the chunk.StorageOpt's for the config.
func Opts(cfg StorageConfig, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) {
client, err := NewStorageClient(cfg, schemaCfg)
if err != nil {
return nil, err
}
return []chunk.StorageOpt{{
From: model.Time(0),
Client: client,
}}, err
}

// NewStorageClient makes a new AWS-backed StorageClient.
func NewStorageClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) {
dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL)
Expand Down
6 changes: 3 additions & 3 deletions cache/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (c *backgroundCache) Stop() error {
return c.Cache.Stop()
}

// StoreChunk writes chunks for the cache in the background.
func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error {
// Store writes keys for the cache in the background.
func (c *backgroundCache) Store(ctx context.Context, key string, buf []byte) error {
bgWrite := backgroundWrite{
key: key,
buf: buf,
Expand All @@ -102,7 +102,7 @@ func (c *backgroundCache) writeBackLoop() {
return
}
queueLength.Dec()
err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf)
err := c.Cache.Store(context.Background(), bgWrite.key, bgWrite.buf)
if err != nil {
level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cache/background_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ type mockCache struct {
cache map[string][]byte
}

func (m *mockCache) StoreChunk(_ context.Context, key string, buf []byte) error {
func (m *mockCache) Store(_ context.Context, key string, buf []byte) error {
m.Lock()
defer m.Unlock()
m.cache[key] = buf
return nil
}

func (m *mockCache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
m.Lock()
defer m.Unlock()
for _, key := range keys {
Expand Down
14 changes: 7 additions & 7 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (

// Cache byte arrays by key.
type Cache interface {
StoreChunk(ctx context.Context, key string, buf []byte) error
FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error)
Store(ctx context.Context, key string, buf []byte) error
Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error)
Stop() error
}

Expand Down Expand Up @@ -48,18 +48,18 @@ func New(cfg Config) (Cache, error) {
if err != nil {
return nil, err
}
caches = append(caches, instrument("diskcache", cache))
caches = append(caches, Instrument("diskcache", cache))
}

if cfg.memcacheClient.Host != "" {
client := newMemcachedClient(cfg.memcacheClient)
client := NewMemcachedClient(cfg.memcacheClient)
cache := NewMemcached(cfg.memcache, client)
caches = append(caches, instrument("memcache", cache))
caches = append(caches, Instrument("memcache", cache))
}

var cache Cache = tiered(caches)
cache := NewTiered(caches)
if len(caches) > 1 {
cache = instrument("tiered", cache)
cache = Instrument("tiered", cache)
}

cache = NewBackground(cfg.background, cache)
Expand Down
8 changes: 4 additions & 4 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) {
require.NoError(t, err)

key := c.ExternalKey()
err = cache.StoreChunk(context.Background(), key, buf)
err = cache.Store(context.Background(), key, buf)
require.NoError(t, err)

keys = append(keys, key)
Expand All @@ -61,7 +61,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch
index := rand.Intn(len(keys))
key := keys[index]

found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), []string{key})
found, bufs, missingKeys, err := cache.Fetch(context.Background(), []string{key})
require.NoError(t, err)
require.Len(t, found, 1)
require.Len(t, bufs, 1)
Expand All @@ -77,7 +77,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch

func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) {
// test getting them all
found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), keys)
found, bufs, missingKeys, err := cache.Fetch(context.Background(), keys)
require.NoError(t, err)
require.Len(t, found, len(keys))
require.Len(t, bufs, len(keys))
Expand Down Expand Up @@ -117,7 +117,7 @@ func (a byExternalKey) Less(i, j int) bool { return a[i].ExternalKey() < a[j].Ex
func testCacheMiss(t *testing.T, cache cache.Cache) {
for i := 0; i < 100; i++ {
key := strconv.Itoa(rand.Int())
found, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{key})
found, bufs, missing, err := cache.Fetch(context.Background(), []string{key})
require.NoError(t, err)
require.Empty(t, found)
require.Empty(t, bufs)
Expand Down
8 changes: 4 additions & 4 deletions cache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func (d *Diskcache) Stop() error {
return d.f.Close()
}

// FetchChunkData get chunks from the cache.
func (d *Diskcache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
// Fetch get chunks from the cache.
func (d *Diskcache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
for _, key := range keys {
buf, ok := d.fetch(key)
if ok {
Expand Down Expand Up @@ -114,8 +114,8 @@ func (d *Diskcache) fetch(key string) ([]byte, bool) {
return result, true
}

// StoreChunk puts a chunk into the cache.
func (d *Diskcache) StoreChunk(ctx context.Context, key string, value []byte) error {
// Store puts a chunk into the cache.
func (d *Diskcache) Store(ctx context.Context, key string, value []byte) error {
d.mtx.Lock()
defer d.mtx.Unlock()

Expand Down
29 changes: 29 additions & 0 deletions cache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,35 @@ func NewFifoCache(name string, size int, validity time.Duration) *FifoCache {
}
}

// Store implements Cache.
func (c *FifoCache) Store(ctx context.Context, key string, buf []byte) error {
c.Put(ctx, key, buf)

return nil
}

// Fetch implements Cache.
func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
found, missing, bufs = make([]string, 0, len(keys)), make([]string, 0, len(keys)), make([][]byte, 0, len(keys))
for _, key := range keys {
val, ok := c.Get(ctx, key)
if !ok {
missing = append(missing, key)
continue
}

found = append(found, key)
bufs = append(bufs, val.([]byte))
}

return
}

// Stop implements Cache.
func (c *FifoCache) Stop() error {
return nil
}

// Put stores the value against the key.
func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) {
span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-put")
Expand Down
83 changes: 64 additions & 19 deletions cache/instrumented.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"time"

ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
Expand All @@ -21,13 +22,13 @@ var (
fetchedKeys = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "cache_fetched_keys",
Help: "Total count of chunks requested from cache.",
Help: "Total count of keys requested from cache.",
}, []string{"name"})

hits = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "cache_hits",
Help: "Total count of chunks found in cache.",
Help: "Total count of keys found in cache.",
}, []string{"name"})
)

Expand All @@ -37,7 +38,19 @@ func init() {
prometheus.MustRegister(hits)
}

func instrument(name string, cache Cache) Cache {
// Instrument returns an instrumented cache.
func Instrument(name string, cache Cache) Cache {
return &instrumentedCache{
name: name,
fetchedKeys: fetchedKeys.WithLabelValues(name),
hits: hits.WithLabelValues(name),
trace: true,
Cache: cache,
}
}

// MetricsInstrument returns an instrumented cache that only tracks metrics and not traces.
func MetricsInstrument(name string, cache Cache) Cache {
return &instrumentedCache{
name: name,
fetchedKeys: fetchedKeys.WithLabelValues(name),
Expand All @@ -49,36 +62,58 @@ func instrument(name string, cache Cache) Cache {
type instrumentedCache struct {
name string
fetchedKeys, hits prometheus.Counter
trace bool
Cache
}

func (i *instrumentedCache) StoreChunk(ctx context.Context, key string, buf []byte) error {
return instr.TimeRequestHistogram(ctx, i.name+".store", requestDuration, func(ctx context.Context) error {
return i.Cache.StoreChunk(ctx, key, buf)
func (i *instrumentedCache) Store(ctx context.Context, key string, buf []byte) error {
method := i.name + ".store"
if i.trace {
return instr.TimeRequestHistogram(ctx, method, requestDuration, func(ctx context.Context) error {
sp := ot.SpanFromContext(ctx)
sp.LogFields(otlog.String("key", key))

return i.Cache.Store(ctx, key, buf)
})
}

return UntracedCollectedRequest(ctx, method, instr.NewHistogramCollector(requestDuration), instr.ErrorCode, func(ctx context.Context) error {
return i.Cache.Store(ctx, key, buf)
})
}

func (i *instrumentedCache) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) {
func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) {
var (
found []string
bufs [][]byte
missing []string
err error
method = i.name + ".fetch"
)
err := instr.TimeRequestHistogram(ctx, i.name+".fetch", requestDuration, func(ctx context.Context) error {
sp := ot.SpanFromContext(ctx)
sp.LogFields(otlog.Int("chunks requested", len(keys)))

var err error
found, bufs, missing, err = i.Cache.FetchChunkData(ctx, keys)
if i.trace {
err = instr.TimeRequestHistogram(ctx, method, requestDuration, func(ctx context.Context) error {
sp := ot.SpanFromContext(ctx)
sp.LogFields(otlog.Int("keys requested", len(keys)))

if err == nil {
sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(keys)-len(found)))
} else {
sp.LogFields(otlog.Error(err))
}
var err error
found, bufs, missing, err = i.Cache.Fetch(ctx, keys)

if err == nil {
sp.LogFields(otlog.Int("keys found", len(found)), otlog.Int("keys missing", len(keys)-len(found)))
}

return err
})
} else {
err = UntracedCollectedRequest(ctx, method, instr.NewHistogramCollector(requestDuration), instr.ErrorCode, func(ctx context.Context) error {
var err error
found, bufs, missing, err = i.Cache.Fetch(ctx, keys)

return err
})
}

return err
})
i.fetchedKeys.Add(float64(len(keys)))
i.hits.Add(float64(len(found)))
return found, bufs, missing, err
Expand All @@ -87,3 +122,13 @@ func (i *instrumentedCache) FetchChunkData(ctx context.Context, keys []string) (
func (i *instrumentedCache) Stop() error {
return i.Cache.Stop()
}

// UntracedCollectedRequest is the same as instr.CollectedRequest but without any tracing.
func UntracedCollectedRequest(ctx context.Context, method string, col instr.Collector, toStatusCode func(error) string, f func(context.Context) error) error {
start := time.Now()
col.Before(method, start)
err := f(ctx)
col.After(method, toStatusCode(err), start)

return err
}
8 changes: 4 additions & 4 deletions cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func memcacheStatusCode(err error) string {
}
}

// FetchChunkData gets chunks from the chunk cache.
func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
// Fetch gets keys from the cache.
func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
var items map[string]*memcache.Item
err = instr.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
var err error
Expand All @@ -86,8 +86,8 @@ func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found []
return
}

// StoreChunk serializes and stores a chunk in the chunk cache.
func (c *Memcached) StoreChunk(ctx context.Context, key string, buf []byte) error {
// Store stores the key in the cache.
func (c *Memcached) Store(ctx context.Context, key string, buf []byte) error {
return instr.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
item := memcache.Item{
Key: key,
Expand Down
25 changes: 19 additions & 6 deletions cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,28 @@ type MemcachedClientConfig struct {

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Host, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
f.StringVar(&cfg.Service, "memcached.service", "memcached", "SRV service used to discover memcache servers.")
f.DurationVar(&cfg.Timeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
f.DurationVar(&cfg.UpdateInterval, "memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.")
cfg.registerFlagsWithPrefix("", f)
}

// newMemcachedClient creates a new MemcacheClient that gets its server list
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.registerFlagsWithPrefix(prefix, f)
}

func (cfg *MemcachedClientConfig) registerFlagsWithPrefix(prefix string, f *flag.FlagSet) {
if prefix != "" {
prefix = prefix + "."
}

f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", "SRV service used to discover memcache servers.")
f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.")
}

// NewMemcachedClient creates a new MemcacheClient that gets its server list
// from SRV and updates the server list on a regular basis.
func newMemcachedClient(cfg MemcachedClientConfig) *memcachedClient {
func NewMemcachedClient(cfg MemcachedClientConfig) MemcachedClient {
var servers memcache.ServerList
client := memcache.NewFromSelector(&servers)
client.Timeout = cfg.Timeout
Expand Down
Loading

0 comments on commit 13d488b

Please sign in to comment.