diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 2ce7e6dd11..c725719a73 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -4,16 +4,19 @@ package storecache import ( + "bytes" "context" "encoding/binary" + "encoding/json" "fmt" "io" "io/ioutil" - "regexp" + "strconv" "sync" "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -28,86 +31,82 @@ import ( const ( originCache = "cache" originBucket = "bucket" -) - -type CachingBucketConfig struct { - // Basic unit used to cache chunks. - ChunkSubrangeSize int64 `yaml:"chunk_subrange_size"` - - // Maximum number of GetRange requests issued by this bucket for single GetRange call. Zero or negative value = unlimited. - MaxChunksGetRangeRequests int `yaml:"max_chunks_get_range_requests"` - // TTLs for various cache items. - ChunkObjectSizeTTL time.Duration `yaml:"chunk_object_size_ttl"` - ChunkSubrangeTTL time.Duration `yaml:"chunk_subrange_ttl"` -} + opGet = "get" + opGetRange = "getrange" + opIter = "iter" + opExists = "exists" + opObjectSize = "objectsize" +) -func DefaultCachingBucketConfig() CachingBucketConfig { - return CachingBucketConfig{ - ChunkSubrangeSize: 16000, // Equal to max chunk size. - ChunkObjectSizeTTL: 24 * time.Hour, - ChunkSubrangeTTL: 24 * time.Hour, - MaxChunksGetRangeRequests: 3, - } -} +var errObjNotFound = errors.Errorf("object not found") -// Bucket implementation that provides some caching features, using knowledge about how Thanos accesses data. +// CachingBucket implementation that provides some caching features, based on passed configuration. type CachingBucket struct { objstore.Bucket - cache cache.Cache - - config CachingBucketConfig - + cfg *CachingBucketConfig logger log.Logger - requestedChunkBytes prometheus.Counter - fetchedChunkBytes *prometheus.CounterVec - refetchedChunkBytes *prometheus.CounterVec + requestedGetRangeBytes *prometheus.CounterVec + fetchedGetRangeBytes *prometheus.CounterVec + refetchedGetRangeBytes *prometheus.CounterVec - objectSizeRequests prometheus.Counter - objectSizeHits prometheus.Counter + operationConfigs map[string][]*operationConfig + operationRequests *prometheus.CounterVec + operationHits *prometheus.CounterVec } -func NewCachingBucket(b objstore.Bucket, c cache.Cache, chunks CachingBucketConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { +// NewCachingBucket creates new caching bucket with provided configuration. Configuration should not be +// changed after creating caching bucket. +func NewCachingBucket(b objstore.Bucket, cfg *CachingBucketConfig, logger log.Logger, reg prometheus.Registerer) (*CachingBucket, error) { if b == nil { return nil, errors.New("bucket is nil") } - if c == nil { - return nil, errors.New("cache is nil") - } cb := &CachingBucket{ Bucket: b, - config: chunks, - cache: c, + cfg: cfg, logger: logger, - requestedChunkBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_requested_chunk_bytes_total", - Help: "Total number of requested bytes for chunk data.", - }), - fetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_fetched_chunk_bytes_total", - Help: "Total number of fetched chunk bytes. Data from bucket is then stored to cache.", - }, []string{"origin"}), - refetchedChunkBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_refetched_chunk_bytes_total", - Help: "Total number of chunk bytes re-fetched from storage, despite being in cache already.", - }, []string{"origin"}), - objectSizeRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_objectsize_requests_total", - Help: "Number of object size requests for objects.", - }), - objectSizeHits: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_store_bucket_cache_objectsize_hits_total", - Help: "Number of object size hits for objects.", - }), - } - - cb.fetchedChunkBytes.WithLabelValues(originBucket) - cb.fetchedChunkBytes.WithLabelValues(originCache) - cb.refetchedChunkBytes.WithLabelValues(originCache) + operationConfigs: map[string][]*operationConfig{}, + + requestedGetRangeBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_getrange_requested_bytes_total", + Help: "Total number of bytes requested via GetRange.", + }, []string{"config"}), + fetchedGetRangeBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_getrange_fetched_bytes_total", + Help: "Total number of bytes fetched because of GetRange operation. Data from bucket is then stored to cache.", + }, []string{"origin", "config"}), + refetchedGetRangeBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_getrange_refetched_bytes_total", + Help: "Total number of bytes re-fetched from storage because of GetRange operation, despite being in cache already.", + }, []string{"origin", "config"}), + + operationRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_operation_requests_total", + Help: "Number of requested operations matching given config.", + }, []string{"operation", "config"}), + operationHits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_bucket_cache_operation_hits_total", + Help: "Number of operations served from cache for given config.", + }, []string{"operation", "config"}), + } + + for op, names := range cfg.allConfigNames() { + for _, n := range names { + cb.operationRequests.WithLabelValues(op, n) + cb.operationHits.WithLabelValues(op, n) + + if op == opGetRange { + cb.requestedGetRangeBytes.WithLabelValues(n) + cb.fetchedGetRangeBytes.WithLabelValues(originCache, n) + cb.fetchedGetRangeBytes.WithLabelValues(originBucket, n) + cb.refetchedGetRangeBytes.WithLabelValues(originCache, n) + } + } + } return cb, nil } @@ -132,35 +131,183 @@ func (cb *CachingBucket) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailur return cb.WithExpectedErrs(expectedFunc) } -var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) +func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) error) error { + cfgName, cfg := cb.cfg.findIterConfig(dir) + if cfg == nil { + return cb.Bucket.Iter(ctx, dir, f) + } -func isTSDBChunkFile(name string) bool { - return chunksMatcher.MatchString(name) + cb.operationRequests.WithLabelValues(opIter, cfgName).Inc() + + key := cachingKeyIter(dir) + data := cfg.cache.Fetch(ctx, []string{key}) + if data[key] != nil { + list, err := cfg.codec.Decode(data[key]) + if err == nil { + cb.operationHits.WithLabelValues(opIter, cfgName).Inc() + for _, n := range list { + if err := f(n); err != nil { + return err + } + } + return nil + } + level.Warn(cb.logger).Log("msg", "failed to decode cached Iter result", "key", key, "err", err) + } + + // Iteration can take a while (esp. since it calls function), and iterTTL is generally low. + // We will compute TTL based time when iteration started. + iterTime := time.Now() + var list []string + err := cb.Bucket.Iter(ctx, dir, func(s string) error { + list = append(list, s) + return f(s) + }) + + remainingTTL := cfg.ttl - time.Since(iterTime) + if err == nil && remainingTTL > 0 { + data, encErr := cfg.codec.Encode(list) + if encErr == nil { + cfg.cache.Store(ctx, map[string][]byte{key: data}, remainingTTL) + return nil + } + level.Warn(cb.logger).Log("msg", "failed to encode Iter result", "key", key, "err", encErr) + } + return err +} + +func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) { + cfgName, cfg := cb.cfg.findExistConfig(name) + if cfg == nil { + return cb.Bucket.Exists(ctx, name) + } + + cb.operationRequests.WithLabelValues(opExists, cfgName).Inc() + + key := cachingKeyExists(name) + hits := cfg.cache.Fetch(ctx, []string{key}) + + if ex := hits[key]; ex != nil { + exists, err := strconv.ParseBool(string(ex)) + if err == nil { + cb.operationHits.WithLabelValues(opExists, cfgName).Inc() + return exists, nil + } + level.Warn(cb.logger).Log("msg", "unexpected cached 'exists' value", "key", key, "val", string(ex)) + } + + existsTime := time.Now() + ok, err := cb.Bucket.Exists(ctx, name) + if err == nil { + storeExistsCacheEntry(ctx, key, ok, existsTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + } + + return ok, err +} + +func storeExistsCacheEntry(ctx context.Context, cachingKey string, exists bool, ts time.Time, cache cache.Cache, existsTTL, doesntExistTTL time.Duration) { + var ttl time.Duration + if exists { + ttl = existsTTL - time.Since(ts) + } else { + ttl = doesntExistTTL - time.Since(ts) + } + + if ttl > 0 { + cache.Store(ctx, map[string][]byte{cachingKey: []byte(strconv.FormatBool(exists))}, ttl) + } +} + +func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + cfgName, cfg := cb.cfg.findGetConfig(name) + if cfg == nil { + return cb.Bucket.Get(ctx, name) + } + + cb.operationRequests.WithLabelValues(opGet, cfgName).Inc() + + contentKey := cachingKeyContent(name) + existsKey := cachingKeyExists(name) + + hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey}) + if hits[contentKey] != nil { + cb.operationHits.WithLabelValues(opGet, cfgName).Inc() + return ioutil.NopCloser(bytes.NewReader(hits[contentKey])), nil + } + + // If we know that file doesn't exist, we can return that. Useful for deletion marks. + if ex := hits[existsKey]; ex != nil { + if exists, err := strconv.ParseBool(string(ex)); err == nil && !exists { + cb.operationHits.WithLabelValues(opGet, cfgName).Inc() + return nil, errObjNotFound + } + } + + getTime := time.Now() + reader, err := cb.Bucket.Get(ctx, name) + if err != nil { + if cb.Bucket.IsObjNotFoundErr(err) { + // Cache that object doesn't exist. + storeExistsCacheEntry(ctx, existsKey, false, getTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + } + + return nil, err + } + + storeExistsCacheEntry(ctx, existsKey, true, getTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + return &getReader{ + c: cfg.cache, + ctx: ctx, + r: reader, + buf: new(bytes.Buffer), + startTime: getTime, + ttl: cfg.contentTTL, + cacheKey: contentKey, + maxSize: cfg.maxCacheableSize, + }, nil +} + +func (cb *CachingBucket) IsObjNotFoundErr(err error) bool { + return err == errObjNotFound || cb.Bucket.IsObjNotFoundErr(err) } func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - if isTSDBChunkFile(name) && off >= 0 && length > 0 { - var ( - r io.ReadCloser - err error - ) - tracing.DoInSpan(ctx, "cachingbucket_getrange_chunkfile", func(ctx context.Context) { - r, err = cb.getRangeChunkFile(ctx, name, off, length) - }) - return r, err + if off < 0 || length <= 0 { + return cb.Bucket.GetRange(ctx, name, off, length) } - return cb.Bucket.GetRange(ctx, name, off, length) + cfgName, cfg := cb.cfg.findGetRangeConfig(name) + if cfg == nil { + return cb.Bucket.GetRange(ctx, name, off, length) + } + + var ( + r io.ReadCloser + err error + ) + tracing.DoInSpan(ctx, "cachingbucket_getrange", func(ctx context.Context) { + r, err = cb.cachedGetRange(ctx, name, off, length, cfgName, cfg) + }) + return r, err } -func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl time.Duration) (uint64, error) { +func (cb *CachingBucket) ObjectSize(ctx context.Context, name string) (uint64, error) { + cfgName, cfg := cb.cfg.findObjectSizeConfig(name) + if cfg == nil { + return cb.Bucket.ObjectSize(ctx, name) + } + + return cb.cachedObjectSize(ctx, name, cfgName, cfg.cache, cfg.ttl) +} + +func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (uint64, error) { key := cachingKeyObjectSize(name) - cb.objectSizeRequests.Add(1) + cb.operationRequests.WithLabelValues(opObjectSize, cfgName).Inc() - hits := cb.cache.Fetch(ctx, []string{key}) + hits := cache.Fetch(ctx, []string{key}) if s := hits[key]; len(s) == 8 { - cb.objectSizeHits.Add(1) + cb.operationHits.WithLabelValues(opObjectSize, cfgName).Inc() return binary.BigEndian.Uint64(s), nil } @@ -171,17 +318,18 @@ func (cb *CachingBucket) cachedObjectSize(ctx context.Context, name string, ttl var buf [8]byte binary.BigEndian.PutUint64(buf[:], size) - cb.cache.Store(ctx, map[string][]byte{key: buf[:]}, ttl) + cache.Store(ctx, map[string][]byte{key: buf[:]}, ttl) return size, nil } -func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) { - cb.requestedChunkBytes.Add(float64(length)) +func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) { + cb.operationRequests.WithLabelValues(opGetRange, cfgName).Inc() + cb.requestedGetRangeBytes.WithLabelValues(cfgName).Add(float64(length)) - size, err := cb.cachedObjectSize(ctx, name, cb.config.ChunkObjectSizeTTL) + size, err := cb.cachedObjectSize(ctx, name, cfgName, cfg.cache, cfg.objectSizeTTL) if err != nil { - return nil, errors.Wrapf(err, "failed to get size of chunk file: %s", name) + return nil, errors.Wrapf(err, "failed to get size of object: %s", name) } // If length goes over object size, adjust length. We use it later to limit number of read bytes. @@ -190,30 +338,32 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off } // Start and end range are subrange-aligned offsets into object, that we're going to read. - startRange := (offset / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize - endRange := ((offset + length) / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize - if (offset+length)%cb.config.ChunkSubrangeSize > 0 { - endRange += cb.config.ChunkSubrangeSize + startRange := (offset / cfg.subrangeSize) * cfg.subrangeSize + endRange := ((offset + length) / cfg.subrangeSize) * cfg.subrangeSize + if (offset+length)%cfg.subrangeSize > 0 { + endRange += cfg.subrangeSize } // The very last subrange in the object may have length that is not divisible by subrange size. - lastSubrangeOffset := endRange - cb.config.ChunkSubrangeSize - lastSubrangeLength := int(cb.config.ChunkSubrangeSize) + lastSubrangeOffset := endRange - cfg.subrangeSize + lastSubrangeLength := int(cfg.subrangeSize) if uint64(endRange) > size { - lastSubrangeOffset = (int64(size) / cb.config.ChunkSubrangeSize) * cb.config.ChunkSubrangeSize + lastSubrangeOffset = (int64(size) / cfg.subrangeSize) * cfg.subrangeSize lastSubrangeLength = int(int64(size) - lastSubrangeOffset) } - numSubranges := (endRange - startRange) / cb.config.ChunkSubrangeSize + numSubranges := (endRange - startRange) / cfg.subrangeSize offsetKeys := make(map[int64]string, numSubranges) keys := make([]string, 0, numSubranges) - for off := startRange; off < endRange; off += cb.config.ChunkSubrangeSize { - end := off + cb.config.ChunkSubrangeSize + totalRequestedBytes := int64(0) + for off := startRange; off < endRange; off += cfg.subrangeSize { + end := off + cfg.subrangeSize if end > int64(size) { end = int64(size) } + totalRequestedBytes += (end - off) k := cachingKeyObjectSubrange(name, off, end) keys = append(keys, k) @@ -221,44 +371,47 @@ func (cb *CachingBucket) getRangeChunkFile(ctx context.Context, name string, off } // Try to get all subranges from the cache. - hits := cb.cache.Fetch(ctx, keys) + totalCachedBytes := int64(0) + hits := cfg.cache.Fetch(ctx, keys) for _, b := range hits { - cb.fetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(b))) + totalCachedBytes += int64(len(b)) } + cb.fetchedGetRangeBytes.WithLabelValues(originCache, cfgName).Add(float64(totalCachedBytes)) + cb.operationHits.WithLabelValues(opGetRange, cfgName).Add(float64(len(hits)) / float64(len(keys))) if len(hits) < len(keys) { if hits == nil { hits = map[string][]byte{} } - err := cb.fetchMissingChunkSubranges(ctx, name, startRange, endRange, offsetKeys, hits, lastSubrangeOffset, lastSubrangeLength) + err := cb.fetchMissingSubranges(ctx, name, startRange, endRange, offsetKeys, hits, lastSubrangeOffset, lastSubrangeLength, cfgName, cfg) if err != nil { return nil, err } } - return ioutil.NopCloser(newSubrangesReader(cb.config.ChunkSubrangeSize, offsetKeys, hits, offset, length)), nil + return ioutil.NopCloser(newSubrangesReader(cfg.subrangeSize, offsetKeys, hits, offset, length)), nil } type rng struct { start, end int64 } -// fetchMissingChunkSubranges fetches missing subranges, stores them into "hits" map +// fetchMissingSubranges fetches missing subranges, stores them into "hits" map // and into cache as well (using provided cacheKeys). -func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastSubrangeOffset int64, lastSubrangeLength int) error { +func (cb *CachingBucket) fetchMissingSubranges(ctx context.Context, name string, startRange, endRange int64, cacheKeys map[int64]string, hits map[string][]byte, lastSubrangeOffset int64, lastSubrangeLength int, cfgName string, cfg *getRangeConfig) error { // Ordered list of missing sub-ranges. var missing []rng - for off := startRange; off < endRange; off += cb.config.ChunkSubrangeSize { + for off := startRange; off < endRange; off += cfg.subrangeSize { if hits[cacheKeys[off]] == nil { - missing = append(missing, rng{start: off, end: off + cb.config.ChunkSubrangeSize}) + missing = append(missing, rng{start: off, end: off + cfg.subrangeSize}) } } missing = mergeRanges(missing, 0) // Merge adjacent ranges. // Keep merging until we have only max number of ranges (= requests). - for limit := cb.config.ChunkSubrangeSize; cb.config.MaxChunksGetRangeRequests > 0 && len(missing) > cb.config.MaxChunksGetRangeRequests; limit = limit * 2 { + for limit := cfg.subrangeSize; cfg.maxSubRequests > 0 && len(missing) > cfg.maxSubRequests; limit = limit * 2 { missing = mergeRanges(missing, limit) } @@ -275,7 +428,7 @@ func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name st } defer runutil.CloseWithLogOnErr(cb.logger, r, "fetching range [%d, %d]", m.start, m.end) - for off := m.start; off < m.end && gctx.Err() == nil; off += cb.config.ChunkSubrangeSize { + for off := m.start; off < m.end && gctx.Err() == nil; off += cfg.subrangeSize { key := cacheKeys[off] if key == "" { return errors.Errorf("fetching range [%d, %d]: caching key for offset %d not found", m.start, m.end, off) @@ -288,7 +441,7 @@ func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name st // if object length isn't divisible by subrange size. subrangeData = make([]byte, lastSubrangeLength) } else { - subrangeData = make([]byte, cb.config.ChunkSubrangeSize) + subrangeData = make([]byte, cfg.subrangeSize) } _, err := io.ReadFull(r, subrangeData) if err != nil { @@ -304,10 +457,10 @@ func (cb *CachingBucket) fetchMissingChunkSubranges(ctx context.Context, name st hitsMutex.Unlock() if storeToCache { - cb.fetchedChunkBytes.WithLabelValues(originBucket).Add(float64(len(subrangeData))) - cb.cache.Store(gctx, map[string][]byte{key: subrangeData}, cb.config.ChunkSubrangeTTL) + cb.fetchedGetRangeBytes.WithLabelValues(originBucket, cfgName).Add(float64(len(subrangeData))) + cfg.cache.Store(gctx, map[string][]byte{key: subrangeData}, cfg.subrangeTTL) } else { - cb.refetchedChunkBytes.WithLabelValues(originCache).Add(float64(len(subrangeData))) + cb.refetchedGetRangeBytes.WithLabelValues(originCache, cfgName).Add(float64(len(subrangeData))) } } @@ -344,6 +497,18 @@ func cachingKeyObjectSubrange(name string, start int64, end int64) string { return fmt.Sprintf("subrange:%s:%d:%d", name, start, end) } +func cachingKeyIter(name string) string { + return fmt.Sprintf("iter:%s", name) +} + +func cachingKeyExists(name string) string { + return fmt.Sprintf("exists:%s", name) +} + +func cachingKeyContent(name string) string { + return fmt.Sprintf("content:%s", name) +} + // Reader implementation that uses in-memory subranges. type subrangesReader struct { subrangeSize int64 @@ -409,3 +574,56 @@ func (c *subrangesReader) subrangeAt(offset int64) ([]byte, error) { } return b, nil } + +type getReader struct { + c cache.Cache + ctx context.Context + r io.ReadCloser + buf *bytes.Buffer + startTime time.Time + ttl time.Duration + cacheKey string + maxSize int +} + +func (g *getReader) Close() error { + // We don't know if entire object was read, don't store it here. + g.buf = nil + return g.r.Close() +} + +func (g *getReader) Read(p []byte) (n int, err error) { + n, err = g.r.Read(p) + if n > 0 && g.buf != nil { + if g.buf.Len()+n <= g.maxSize { + g.buf.Write(p[:n]) + } else { + // Object is larger than max size, stop caching. + g.buf = nil + } + } + + if err == io.EOF && g.buf != nil { + remainingTTL := g.ttl - time.Since(g.startTime) + if remainingTTL > 0 { + g.c.Store(g.ctx, map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL) + } + // Clear reference, to avoid doing another Store on next read. + g.buf = nil + } + + return n, err +} + +// JSONIterCodec encodes iter results into JSON. Suitable for root dir. +type JSONIterCodec struct{} + +func (jic JSONIterCodec) Encode(files []string) ([]byte, error) { + return json.Marshal(files) +} + +func (jic JSONIterCodec) Decode(data []byte) ([]string, error) { + var list []string + err := json.Unmarshal(data, &list) + return list, err +} diff --git a/pkg/store/cache/caching_bucket_config.go b/pkg/store/cache/caching_bucket_config.go new file mode 100644 index 0000000000..dce0350fdf --- /dev/null +++ b/pkg/store/cache/caching_bucket_config.go @@ -0,0 +1,208 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "time" + + "github.com/thanos-io/thanos/pkg/cache" +) + +// Codec for encoding and decoding results of Iter call. +type IterCodec interface { + Encode(files []string) ([]byte, error) + Decode(cachedData []byte) ([]string, error) +} + +// CachingBucketConfig contains low-level configuration for individual bucket operations. +// This is not exposed to the user, but it is expected that code sets up individual +// operations based on user-provided configuration. +type CachingBucketConfig struct { + get map[string]*getConfig + iter map[string]*iterConfig + exists map[string]*existsConfig + getRange map[string]*getRangeConfig + objectSize map[string]*objectSizeConfig +} + +func NewCachingBucketConfig() *CachingBucketConfig { + return &CachingBucketConfig{ + get: map[string]*getConfig{}, + iter: map[string]*iterConfig{}, + exists: map[string]*existsConfig{}, + getRange: map[string]*getRangeConfig{}, + objectSize: map[string]*objectSizeConfig{}, + } +} + +// Generic config for single operation. +type operationConfig struct { + matcher func(name string) bool + cache cache.Cache +} + +// Operation-specific configs. +type iterConfig struct { + operationConfig + ttl time.Duration + codec IterCodec +} + +type existsConfig struct { + operationConfig + existsTTL time.Duration + doesntExistTTL time.Duration +} + +type getConfig struct { + existsConfig + contentTTL time.Duration + maxCacheableSize int +} + +type getRangeConfig struct { + operationConfig + subrangeSize int64 + maxSubRequests int + objectSizeTTL time.Duration + subrangeTTL time.Duration +} + +type objectSizeConfig struct { + operationConfig + ttl time.Duration +} + +func newOperationConfig(cache cache.Cache, matcher func(string) bool) operationConfig { + if cache == nil { + panic("cache") + } + if matcher == nil { + panic("matcher") + } + + return operationConfig{ + matcher: matcher, + cache: cache, + } +} + +// CacheIter configures caching of "Iter" operation for matching directories. +func (cfg *CachingBucketConfig) CacheIter(configName string, cache cache.Cache, matcher func(string) bool, ttl time.Duration, codec IterCodec) { + cfg.iter[configName] = &iterConfig{ + operationConfig: newOperationConfig(cache, matcher), + ttl: ttl, + codec: codec, + } +} + +// CacheGet configures caching of "Get" operation for matching files. Content of the object is cached, as well as whether object exists or not. +func (cfg *CachingBucketConfig) CacheGet(configName string, cache cache.Cache, matcher func(string) bool, maxCacheableSize int, contentTTL, existsTTL, doesntExistTTL time.Duration) { + cfg.get[configName] = &getConfig{ + existsConfig: existsConfig{ + operationConfig: newOperationConfig(cache, matcher), + existsTTL: existsTTL, + doesntExistTTL: doesntExistTTL, + }, + contentTTL: contentTTL, + maxCacheableSize: maxCacheableSize, + } +} + +// CacheExists configures caching of "Exists" operation for matching files. Negative values are cached as well. +func (cfg *CachingBucketConfig) CacheExists(configName string, cache cache.Cache, matcher func(string) bool, existsTTL, doesntExistTTL time.Duration) { + cfg.exists[configName] = &existsConfig{ + operationConfig: newOperationConfig(cache, matcher), + existsTTL: existsTTL, + doesntExistTTL: doesntExistTTL, + } +} + +// CacheGetRange configures caching of "GetRange" operation. Subranges (aligned on subrange size) are cached individually. +// Since caching operation needs to know the object size to compute correct subranges, object size is cached as well. +// Single "GetRange" requests can result in multiple smaller GetRange sub-requests issued on the underlying bucket. +// MaxSubRequests specifies how many such subrequests may be issued. Values <= 0 mean there is no limit (requests +// for adjacent missing subranges are still merged). +func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache cache.Cache, matcher func(string) bool, subrangeSize int64, objectSizeTTL, subrangeTTL time.Duration, maxSubRequests int) { + cfg.getRange[configName] = &getRangeConfig{ + operationConfig: newOperationConfig(cache, matcher), + subrangeSize: subrangeSize, + objectSizeTTL: objectSizeTTL, + subrangeTTL: subrangeTTL, + maxSubRequests: maxSubRequests, + } +} + +// CacheObjectSize configures caching of "ObjectSize" operation for matching files. +func (cfg *CachingBucketConfig) CacheObjectSize(configName string, cache cache.Cache, matcher func(name string) bool, ttl time.Duration) { + cfg.objectSize[configName] = &objectSizeConfig{ + operationConfig: newOperationConfig(cache, matcher), + ttl: ttl, + } +} + +func (cfg *CachingBucketConfig) allConfigNames() map[string][]string { + result := map[string][]string{} + for n := range cfg.get { + result[opGet] = append(result[opGet], n) + } + for n := range cfg.iter { + result[opIter] = append(result[opIter], n) + } + for n := range cfg.exists { + result[opExists] = append(result[opExists], n) + } + for n := range cfg.getRange { + result[opGetRange] = append(result[opGetRange], n) + } + for n := range cfg.objectSize { + result[opObjectSize] = append(result[opObjectSize], n) + } + return result +} + +func (cfg *CachingBucketConfig) findIterConfig(dir string) (string, *iterConfig) { + for n, cfg := range cfg.iter { + if cfg.matcher(dir) { + return n, cfg + } + } + return "", nil +} + +func (cfg *CachingBucketConfig) findExistConfig(name string) (string, *existsConfig) { + for n, cfg := range cfg.exists { + if cfg.matcher(name) { + return n, cfg + } + } + return "", nil +} + +func (cfg *CachingBucketConfig) findGetConfig(name string) (string, *getConfig) { + for n, cfg := range cfg.get { + if cfg.matcher(name) { + return n, cfg + } + } + return "", nil +} + +func (cfg *CachingBucketConfig) findGetRangeConfig(name string) (string, *getRangeConfig) { + for n, cfg := range cfg.getRange { + if cfg.matcher(name) { + return n, cfg + } + } + return "", nil +} + +func (cfg *CachingBucketConfig) findObjectSizeConfig(name string) (string, *objectSizeConfig) { + for n, cfg := range cfg.objectSize { + if cfg.matcher(name) { + return n, cfg + } + } + return "", nil +} diff --git a/pkg/store/cache/caching_bucket_factory.go b/pkg/store/cache/caching_bucket_factory.go index 0bc0a78458..3dbd60276e 100644 --- a/pkg/store/cache/caching_bucket_factory.go +++ b/pkg/store/cache/caching_bucket_factory.go @@ -4,38 +4,71 @@ package storecache import ( + "regexp" + "strings" + "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "gopkg.in/yaml.v2" + "github.com/thanos-io/thanos/pkg/block/metadata" cache "github.com/thanos-io/thanos/pkg/cache" "github.com/thanos-io/thanos/pkg/cacheutil" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" ) // BucketCacheProvider is a type used to evaluate all bucket cache providers. type BucketCacheProvider string -const ( - MemcachedBucketCacheProvider BucketCacheProvider = "memcached" // Memcached cache-provider for caching bucket. -) +const MemcachedBucketCacheProvider BucketCacheProvider = "memcached" // Memcached cache-provider for caching bucket. -// CachingBucketWithBackendConfig is a configuration of caching bucket used by Store component. -type CachingBucketWithBackendConfig struct { +// CachingWithBackendConfig is a configuration of caching bucket used by Store component. +type CachingWithBackendConfig struct { Type BucketCacheProvider `yaml:"backend"` BackendConfig interface{} `yaml:"backend_config"` - CachingBucketConfig CachingBucketConfig `yaml:"caching_config"` + // Basic unit used to cache chunks. + ChunkSubrangeSize int64 `yaml:"chunk_subrange_size"` + + // Maximum number of GetRange requests issued by this bucket for single GetRange call. Zero or negative value = unlimited. + MaxChunksGetRangeRequests int `yaml:"max_chunks_get_range_requests"` + + // TTLs for various cache items. + ChunkObjectSizeTTL time.Duration `yaml:"chunk_object_size_ttl"` + ChunkSubrangeTTL time.Duration `yaml:"chunk_subrange_ttl"` + + // How long to cache result of Iter call in root directory. + BlocksIterTTL time.Duration `yaml:"blocks_iter_ttl"` + + // Config for Exists and Get operations for metadata files. + MetafileExistsTTL time.Duration `yaml:"metafile_exists_ttl"` + MetafileDoesntExistTTL time.Duration `yaml:"metafile_doesnt_exist_ttl"` + MetafileContentTTL time.Duration `yaml:"metafile_content_ttl"` + MetafileMaxSize model.Bytes `yaml:"metafile_max_size"` +} + +func (cfg *CachingWithBackendConfig) Defaults() { + cfg.ChunkSubrangeSize = 16000 // Equal to max chunk size. + cfg.ChunkObjectSizeTTL = 24 * time.Hour + cfg.ChunkSubrangeTTL = 24 * time.Hour + cfg.MaxChunksGetRangeRequests = 3 + cfg.BlocksIterTTL = 5 * time.Minute + cfg.MetafileExistsTTL = 2 * time.Hour + cfg.MetafileDoesntExistTTL = 15 * time.Minute + cfg.MetafileContentTTL = 24 * time.Hour + cfg.MetafileMaxSize = 1024 * 1024 // Equal to default MaxItemSize in memcached client. } // NewCachingBucketFromYaml uses YAML configuration to create new caching bucket. func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { level.Info(logger).Log("msg", "loading caching bucket configuration") - config := &CachingBucketWithBackendConfig{} - config.CachingBucketConfig = DefaultCachingBucketConfig() + config := &CachingWithBackendConfig{} + config.Defaults() if err := yaml.UnmarshalStrict(yamlContent, config); err != nil { return nil, errors.Wrap(err, "parsing config YAML file") @@ -60,5 +93,32 @@ func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger return nil, errors.Errorf("unsupported cache type: %s", config.Type) } - return NewCachingBucket(bucket, c, config.CachingBucketConfig, logger, reg) + cfg := NewCachingBucketConfig() + + // Configure cache. + cfg.CacheGetRange("chunks", c, isTSDBChunkFile, config.ChunkSubrangeSize, config.ChunkObjectSizeTTL, config.ChunkSubrangeTTL, config.MaxChunksGetRangeRequests) + cfg.CacheExists("meta.jsons", c, isMetaFile, config.MetafileExistsTTL, config.MetafileDoesntExistTTL) + cfg.CacheGet("meta.jsons", c, isMetaFile, int(config.MetafileMaxSize), config.MetafileContentTTL, config.MetafileExistsTTL, config.MetafileDoesntExistTTL) + + // Cache Iter requests for root. + cfg.CacheIter("blocks-iter", c, isBlocksRootDir, config.BlocksIterTTL, JSONIterCodec{}) + + cb, err := NewCachingBucket(bucket, cfg, logger, reg) + if err != nil { + return nil, err + } + + return cb, nil +} + +var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) + +func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) } + +func isMetaFile(name string) bool { + return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) +} + +func isBlocksRootDir(name string) bool { + return name == "" } diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index 2bbc37c887..d392d9066e 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -9,6 +9,8 @@ import ( "fmt" "io" "io/ioutil" + "sort" + "strings" "sync" "testing" "time" @@ -17,10 +19,13 @@ import ( promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" ) -func TestCachingBucket(t *testing.T) { +const testFilename = "/random_object" + +func TestChunksCaching(t *testing.T) { length := int64(1024 * 1024) subrangeSize := int64(16000) // All tests are based on this value. @@ -35,7 +40,7 @@ func TestCachingBucket(t *testing.T) { testutil.Ok(t, inmem.Upload(context.Background(), name, bytes.NewReader(data))) // We reuse cache between tests (!) - cache := &mockCache{cache: make(map[string][]byte)} + cache := newMockCache() // Warning, these tests must be run in order, they depend cache state from previous test. for _, tc := range []struct { @@ -106,7 +111,7 @@ func TestCachingBucket(t *testing.T) { expectedFetchedBytes: length, expectedCachedBytes: 0, // Cache is flushed. init: func() { - cache.cache = map[string][]byte{} // Flush cache. + cache.flush() }, }, @@ -217,24 +222,21 @@ func TestCachingBucket(t *testing.T) { tc.init() } - cfg := DefaultCachingBucketConfig() - cfg.ChunkSubrangeSize = subrangeSize - cfg.MaxChunksGetRangeRequests = tc.maxGetRangeRequests + cfg := NewCachingBucketConfig() + cfg.CacheGetRange("chunks", cache, isTSDBChunkFile, subrangeSize, time.Hour, time.Hour, tc.maxGetRangeRequests) - cachingBucket, err := NewCachingBucket(inmem, cache, cfg, nil, nil) + cachingBucket, err := NewCachingBucket(inmem, cfg, nil, nil) testutil.Ok(t, err) verifyGetRange(t, cachingBucket, name, tc.offset, tc.length, tc.expectedLength) - testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes.WithLabelValues(originCache)))) - testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedChunkBytes.WithLabelValues(originBucket)))) - testutil.Equals(t, tc.expectedRefetchedBytes, int64(promtest.ToFloat64(cachingBucket.refetchedChunkBytes.WithLabelValues(originCache)))) + testutil.Equals(t, tc.expectedCachedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedGetRangeBytes.WithLabelValues(originCache, "chunks")))) + testutil.Equals(t, tc.expectedFetchedBytes, int64(promtest.ToFloat64(cachingBucket.fetchedGetRangeBytes.WithLabelValues(originBucket, "chunks")))) + testutil.Equals(t, tc.expectedRefetchedBytes, int64(promtest.ToFloat64(cachingBucket.refetchedGetRangeBytes.WithLabelValues(originCache, "chunks")))) }) } } func verifyGetRange(t *testing.T, cachingBucket *CachingBucket, name string, offset, length int64, expectedLength int64) { - t.Helper() - r, err := cachingBucket.GetRange(context.Background(), name, offset, length) testutil.Ok(t, err) @@ -249,16 +251,29 @@ func verifyGetRange(t *testing.T, cachingBucket *CachingBucket, name string, off } } +type cacheItem struct { + data []byte + exp time.Time +} + type mockCache struct { mu sync.Mutex - cache map[string][]byte + cache map[string]cacheItem +} + +func newMockCache() *mockCache { + c := &mockCache{} + c.flush() + return c } -func (m *mockCache) Store(_ context.Context, data map[string][]byte, _ time.Duration) { +func (m *mockCache) Store(_ context.Context, data map[string][]byte, ttl time.Duration) { m.mu.Lock() defer m.mu.Unlock() + + exp := time.Now().Add(ttl) for key, val := range data { - m.cache[key] = val + m.cache[key] = cacheItem{data: val, exp: exp} } } @@ -268,16 +283,21 @@ func (m *mockCache) Fetch(_ context.Context, keys []string) map[string][]byte { found := make(map[string][]byte, len(keys)) + now := time.Now() for _, k := range keys { v, ok := m.cache[k] - if ok { - found[k] = v + if ok && now.Before(v.exp) { + found[k] = v.data } } return found } +func (m *mockCache) flush() { + m.cache = map[string]cacheItem{} +} + func TestMergeRanges(t *testing.T) { for ix, tc := range []struct { input []rng @@ -315,7 +335,11 @@ func TestMergeRanges(t *testing.T) { func TestInvalidOffsetAndLength(t *testing.T) { b := &testBucket{objstore.NewInMemBucket()} - c, err := NewCachingBucket(b, &mockCache{cache: make(map[string][]byte)}, DefaultCachingBucketConfig(), nil, nil) + + cfg := NewCachingBucketConfig() + cfg.CacheGetRange("chunks", newMockCache(), func(string) bool { return true }, 10000, time.Hour, time.Hour, 3) + + c, err := NewCachingBucket(b, cfg, nil, nil) testutil.Ok(t, err) r, err := c.GetRange(context.Background(), "test", -1, 1000) @@ -342,3 +366,293 @@ func (b *testBucket) GetRange(ctx context.Context, name string, off, length int6 return b.InMemBucket.GetRange(ctx, name, off, length) } + +func TestCachedIter(t *testing.T) { + inmem := objstore.NewInMemBucket() + testutil.Ok(t, inmem.Upload(context.Background(), "/file-1", strings.NewReader("hej"))) + testutil.Ok(t, inmem.Upload(context.Background(), "/file-2", strings.NewReader("ahoj"))) + testutil.Ok(t, inmem.Upload(context.Background(), "/file-3", strings.NewReader("hello"))) + testutil.Ok(t, inmem.Upload(context.Background(), "/file-4", strings.NewReader("ciao"))) + + allFiles := []string{"/file-1", "/file-2", "/file-3", "/file-4"} + + // We reuse cache between tests (!) + cache := newMockCache() + + const cfgName = "dirs" + cfg := NewCachingBucketConfig() + cfg.CacheIter(cfgName, cache, func(string) bool { return true }, 5*time.Minute, JSONIterCodec{}) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyIter(t, cb, allFiles, false, cfgName) + + testutil.Ok(t, inmem.Upload(context.Background(), "/file-5", strings.NewReader("nazdar"))) + verifyIter(t, cb, allFiles, true, cfgName) // Iter returns old response. + + cache.flush() + allFiles = append(allFiles, "/file-5") + verifyIter(t, cb, allFiles, false, cfgName) + + cache.flush() + + e := errors.Errorf("test error") + + // This iteration returns false. Result will not be cached. + testutil.Equals(t, e, cb.Iter(context.Background(), "/", func(_ string) error { + return e + })) + + // Nothing cached now. + verifyIter(t, cb, allFiles, false, cfgName) +} + +func verifyIter(t *testing.T, cb *CachingBucket, expectedFiles []string, expectedCache bool, cfgName string) { + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opIter, cfgName))) + + col := iterCollector{} + testutil.Ok(t, cb.Iter(context.Background(), "/", col.collect)) + + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opIter, cfgName))) + + sort.Strings(col.items) + testutil.Equals(t, expectedFiles, col.items) + + expectedHitsDiff := 0 + if expectedCache { + expectedHitsDiff = 1 + } + + testutil.Equals(t, expectedHitsDiff, hitsAfter-hitsBefore) +} + +type iterCollector struct { + items []string +} + +func (it *iterCollector) collect(s string) error { + it.items = append(it.items, s) + return nil +} + +func TestExists(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "test" + cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyExists(t, cb, testFilename, false, false, cfgName) + + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, strings.NewReader("hej"))) + verifyExists(t, cb, testFilename, false, true, cfgName) // Reused cache result. + cache.flush() + verifyExists(t, cb, testFilename, true, false, cfgName) + + testutil.Ok(t, inmem.Delete(context.Background(), testFilename)) + verifyExists(t, cb, testFilename, true, true, cfgName) // Reused cache result. + cache.flush() + verifyExists(t, cb, testFilename, false, false, cfgName) +} + +func TestExistsCachingDisabled(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "test" + cfg.CacheExists(cfgName, cache, func(string) bool { return false }, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyExists(t, cb, testFilename, false, false, cfgName) + + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, strings.NewReader("hej"))) + verifyExists(t, cb, testFilename, true, false, cfgName) + + testutil.Ok(t, inmem.Delete(context.Background(), testFilename)) + verifyExists(t, cb, testFilename, false, false, cfgName) +} + +func verifyExists(t *testing.T, cb *CachingBucket, file string, exists bool, fromCache bool, cfgName string) { + t.Helper() + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opExists, cfgName))) + ok, err := cb.Exists(context.Background(), file) + testutil.Ok(t, err) + testutil.Equals(t, exists, ok) + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opExists, cfgName))) + + if fromCache { + testutil.Equals(t, 1, hitsAfter-hitsBefore) + } else { + testutil.Equals(t, 0, hitsAfter-hitsBefore) + } +} + +func TestGet(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "metafile" + cfg.CacheGet(cfgName, cache, matchAll, 1024, 10*time.Minute, 10*time.Minute, 2*time.Minute) + cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyGet(t, cb, testFilename, nil, false, cfgName) + verifyExists(t, cb, testFilename, false, true, cfgName) + + data := []byte("hello world") + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) + + // Even if file is now uploaded, old data is served from cache. + verifyGet(t, cb, testFilename, nil, true, cfgName) + verifyExists(t, cb, testFilename, false, true, cfgName) + + cache.flush() + + verifyGet(t, cb, testFilename, data, false, cfgName) + verifyGet(t, cb, testFilename, data, true, cfgName) + verifyExists(t, cb, testFilename, true, true, cfgName) +} + +func TestGetTooBigObject(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "metafile" + // Only allow 5 bytes to be cached. + cfg.CacheGet(cfgName, cache, matchAll, 5, 10*time.Minute, 10*time.Minute, 2*time.Minute) + cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + data := []byte("hello world") + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) + + // Object is too big, so it will not be stored to cache on first read. + verifyGet(t, cb, testFilename, data, false, cfgName) + verifyGet(t, cb, testFilename, data, false, cfgName) + verifyExists(t, cb, testFilename, true, true, cfgName) +} + +func TestGetPartialRead(t *testing.T) { + inmem := objstore.NewInMemBucket() + + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "metafile" + cfg.CacheGet(cfgName, cache, matchAll, 1024, 10*time.Minute, 10*time.Minute, 2*time.Minute) + cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + data := []byte("hello world") + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) + + // Read only few bytes from data. + r, err := cb.Get(context.Background(), testFilename) + testutil.Ok(t, err) + _, err = r.Read(make([]byte, 1)) + testutil.Ok(t, err) + testutil.Ok(t, r.Close()) + + // Object wasn't cached as it wasn't fully read. + verifyGet(t, cb, testFilename, data, false, cfgName) + // VerifyGet read object, so now it's cached. + verifyGet(t, cb, testFilename, data, true, cfgName) +} + +func verifyGet(t *testing.T, cb *CachingBucket, file string, expectedData []byte, cacheUsed bool, cfgName string) { + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + + r, err := cb.Get(context.Background(), file) + if expectedData == nil { + testutil.Assert(t, cb.IsObjNotFoundErr(err)) + + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + if cacheUsed { + testutil.Equals(t, 1, hitsAfter-hitsBefore) + } else { + testutil.Equals(t, 0, hitsAfter-hitsBefore) + } + } else { + testutil.Ok(t, err) + defer runutil.CloseWithLogOnErr(nil, r, "verifyGet") + data, err := ioutil.ReadAll(r) + testutil.Ok(t, err) + testutil.Equals(t, expectedData, data) + + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + if cacheUsed { + testutil.Equals(t, 1, hitsAfter-hitsBefore) + } else { + testutil.Equals(t, 0, hitsAfter-hitsBefore) + } + } +} + +func TestObjectSize(t *testing.T) { + inmem := objstore.NewInMemBucket() + + // We reuse cache between tests (!) + cache := newMockCache() + + cfg := NewCachingBucketConfig() + const cfgName = "test" + cfg.CacheObjectSize(cfgName, cache, matchAll, time.Minute) + + cb, err := NewCachingBucket(inmem, cfg, nil, nil) + testutil.Ok(t, err) + + verifyObjectSize(t, cb, testFilename, -1, false, cfgName) + verifyObjectSize(t, cb, testFilename, -1, false, cfgName) // ObjectSize doesn't cache non-existent files. + + data := []byte("hello world") + testutil.Ok(t, inmem.Upload(context.Background(), testFilename, bytes.NewBuffer(data))) + + verifyObjectSize(t, cb, testFilename, len(data), false, cfgName) + verifyObjectSize(t, cb, testFilename, len(data), true, cfgName) +} + +func verifyObjectSize(t *testing.T, cb *CachingBucket, file string, expectedLength int, cacheUsed bool, cfgName string) { + t.Helper() + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opObjectSize, cfgName))) + + length, err := cb.ObjectSize(context.Background(), file) + if expectedLength < 0 { + testutil.Assert(t, cb.IsObjNotFoundErr(err)) + } else { + testutil.Ok(t, err) + testutil.Equals(t, uint64(expectedLength), length) + + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opObjectSize, cfgName))) + if cacheUsed { + testutil.Equals(t, 1, hitsAfter-hitsBefore) + } else { + testutil.Equals(t, 0, hitsAfter-hitsBefore) + } + } +} + +func matchAll(string) bool { return true }