diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index f5a436cc2c..6c727252b7 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -17,9 +17,20 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/runutil" ) +const ( + OpIter = "iter" + OpGet = "get" + OpGetRange = "get_range" + OpExists = "exists" + OpUpload = "upload" + OpDelete = "delete" + OpAttributes = "attributes" +) + // Bucket provides read and write access to an object storage bucket. // NOTE: We assume strong consistency for write-read flow. type Bucket interface { @@ -218,16 +229,6 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, return nil } -const ( - iterOp = "iter" - getOp = "get" - getRangeOp = "get_range" - existsOp = "exists" - uploadOp = "upload" - deleteOp = "delete" - attributesOp = "attributes" -) - // IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment thanos_objstore_bucket_operation_failures_total metric. type IsOpFailureExpectedFunc func(error) bool @@ -263,13 +264,13 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric }, []string{"bucket"}), } for _, op := range []string{ - iterOp, - getOp, - getRangeOp, - existsOp, - uploadOp, - deleteOp, - attributesOp, + OpIter, + OpGet, + OpGetRange, + OpExists, + OpUpload, + OpDelete, + OpAttributes, } { bkt.ops.WithLabelValues(op) bkt.opsFailures.WithLabelValues(op) @@ -306,7 +307,7 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket } func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error { - const op = iterOp + const op = OpIter b.ops.WithLabelValues(op).Inc() err := b.bkt.Iter(ctx, dir, f) @@ -317,7 +318,7 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) } func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { - const op = attributesOp + const op = OpAttributes b.ops.WithLabelValues(op).Inc() start := time.Now() @@ -333,7 +334,7 @@ func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttri } func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - const op = getOp + const op = OpGet b.ops.WithLabelValues(op).Inc() rc, err := b.bkt.Get(ctx, name) @@ -353,7 +354,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err } func (b *metricBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - const op = getRangeOp + const op = OpGetRange b.ops.WithLabelValues(op).Inc() rc, err := b.bkt.GetRange(ctx, name, off, length) @@ -373,7 +374,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in } func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) { - const op = existsOp + const op = OpExists b.ops.WithLabelValues(op).Inc() start := time.Now() @@ -389,7 +390,7 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) { } func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error { - const op = uploadOp + const op = OpUpload b.ops.WithLabelValues(op).Inc() start := time.Now() @@ -405,7 +406,7 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err } func (b *metricBucket) Delete(ctx context.Context, name string) error { - const op = deleteOp + const op = OpDelete b.ops.WithLabelValues(op).Inc() start := time.Now() diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index c007cccc74..73ba167135 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -7,6 +7,7 @@ import ( "testing" promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/thanos-io/thanos/pkg/testutil" ) @@ -18,21 +19,21 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr)) - testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp))) - testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(attributesOp))) - testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp))) - testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp))) - testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp))) - testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp))) - testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp))) + testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter))) + testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpAttributes))) + testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet))) + testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange))) + testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists))) + testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload))) + testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete))) testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(attributesOp))) - testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpAttributes))) + testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGet))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGetRange))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpExists))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpUpload))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpDelete))) testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures)) testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) lastUpload := promtest.ToFloat64(bkt.lastSuccessfulUploadTime) @@ -41,23 +42,23 @@ func TestMetricBucket_Close(t *testing.T) { // Clear bucket, but don't clear metrics to ensure we use same. bkt.bkt = NewInMemBucket() AcceptanceTest(t, bkt) - testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp))) - testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(attributesOp))) - testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp))) - testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp))) - testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp))) - testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp))) - testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp))) + testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter))) + testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpAttributes))) + testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet))) + testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange))) + testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists))) + testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload))) + testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete))) testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops)) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter))) // Not expected not found error here. - testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(attributesOp))) + testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpAttributes))) // Not expected not found errors, this should increment failure metric on get for not found as well, so +2. - testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp))) - testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp))) + testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGet))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGetRange))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpExists))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpUpload))) + testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpDelete))) testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures)) testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload) diff --git a/pkg/store/cache/caching_bucket.go b/pkg/store/cache/caching_bucket.go index 4005925eb9..aa895a3a45 100644 --- a/pkg/store/cache/caching_bucket.go +++ b/pkg/store/cache/caching_bucket.go @@ -29,12 +29,6 @@ import ( const ( originCache = "cache" originBucket = "bucket" - - opGet = "get" - opGetRange = "getrange" - opIter = "iter" - opExists = "exists" - opAttributes = "attributes" ) var errObjNotFound = errors.Errorf("object not found") @@ -97,7 +91,7 @@ func NewCachingBucket(b objstore.Bucket, cfg *CachingBucketConfig, logger log.Lo cb.operationRequests.WithLabelValues(op, n) cb.operationHits.WithLabelValues(op, n) - if op == opGetRange { + if op == objstore.OpGetRange { cb.requestedGetRangeBytes.WithLabelValues(n) cb.fetchedGetRangeBytes.WithLabelValues(originCache, n) cb.fetchedGetRangeBytes.WithLabelValues(originBucket, n) @@ -135,14 +129,14 @@ func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) er return cb.Bucket.Iter(ctx, dir, f) } - cb.operationRequests.WithLabelValues(opIter, cfgName).Inc() + cb.operationRequests.WithLabelValues(objstore.OpIter, cfgName).Inc() key := cachingKeyIter(dir) data := cfg.cache.Fetch(ctx, []string{key}) if data[key] != nil { list, err := cfg.codec.Decode(data[key]) if err == nil { - cb.operationHits.WithLabelValues(opIter, cfgName).Inc() + cb.operationHits.WithLabelValues(objstore.OpIter, cfgName).Inc() for _, n := range list { if err := f(n); err != nil { return err @@ -180,7 +174,7 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) return cb.Bucket.Exists(ctx, name) } - cb.operationRequests.WithLabelValues(opExists, cfgName).Inc() + cb.operationRequests.WithLabelValues(objstore.OpExists, cfgName).Inc() key := cachingKeyExists(name) hits := cfg.cache.Fetch(ctx, []string{key}) @@ -188,7 +182,7 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) if ex := hits[key]; ex != nil { exists, err := strconv.ParseBool(string(ex)) if err == nil { - cb.operationHits.WithLabelValues(opExists, cfgName).Inc() + cb.operationHits.WithLabelValues(objstore.OpExists, cfgName).Inc() return exists, nil } level.Warn(cb.logger).Log("msg", "unexpected cached 'exists' value", "key", key, "val", string(ex)) @@ -222,21 +216,21 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e return cb.Bucket.Get(ctx, name) } - cb.operationRequests.WithLabelValues(opGet, cfgName).Inc() + cb.operationRequests.WithLabelValues(objstore.OpGet, cfgName).Inc() contentKey := cachingKeyContent(name) existsKey := cachingKeyExists(name) hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey}) if hits[contentKey] != nil { - cb.operationHits.WithLabelValues(opGet, cfgName).Inc() + cb.operationHits.WithLabelValues(objstore.OpGet, cfgName).Inc() return ioutil.NopCloser(bytes.NewReader(hits[contentKey])), nil } // If we know that file doesn't exist, we can return that. Useful for deletion marks. if ex := hits[existsKey]; ex != nil { if exists, err := strconv.ParseBool(string(ex)); err == nil && !exists { - cb.operationHits.WithLabelValues(opGet, cfgName).Inc() + cb.operationHits.WithLabelValues(objstore.OpGet, cfgName).Inc() return nil, errObjNotFound } } @@ -294,14 +288,14 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore. func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) { key := cachingKeyAttributes(name) - cb.operationRequests.WithLabelValues(opAttributes, cfgName).Inc() + cb.operationRequests.WithLabelValues(objstore.OpAttributes, cfgName).Inc() hits := cache.Fetch(ctx, []string{key}) if raw, ok := hits[key]; ok { var attrs objstore.ObjectAttributes err := json.Unmarshal(raw, &attrs) if err == nil { - cb.operationHits.WithLabelValues(opAttributes, cfgName).Inc() + cb.operationHits.WithLabelValues(objstore.OpAttributes, cfgName).Inc() return attrs, nil } @@ -323,7 +317,7 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, cfgN } func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) { - cb.operationRequests.WithLabelValues(opGetRange, cfgName).Inc() + cb.operationRequests.WithLabelValues(objstore.OpGetRange, cfgName).Inc() cb.requestedGetRangeBytes.WithLabelValues(cfgName).Add(float64(length)) attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.attributesTTL) @@ -376,7 +370,7 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset totalCachedBytes += int64(len(b)) } cb.fetchedGetRangeBytes.WithLabelValues(originCache, cfgName).Add(float64(totalCachedBytes)) - cb.operationHits.WithLabelValues(opGetRange, cfgName).Add(float64(len(hits)) / float64(len(keys))) + cb.operationHits.WithLabelValues(objstore.OpGetRange, cfgName).Add(float64(len(hits)) / float64(len(keys))) if len(hits) < len(keys) { if hits == nil { diff --git a/pkg/store/cache/caching_bucket_config.go b/pkg/store/cache/caching_bucket_config.go index fbc19bfaf7..d8e089d7e2 100644 --- a/pkg/store/cache/caching_bucket_config.go +++ b/pkg/store/cache/caching_bucket_config.go @@ -7,6 +7,7 @@ import ( "time" "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/objstore" ) // Codec for encoding and decoding results of Iter call. @@ -145,19 +146,19 @@ func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache cache.C func (cfg *CachingBucketConfig) allConfigNames() map[string][]string { result := map[string][]string{} for n := range cfg.get { - result[opGet] = append(result[opGet], n) + result[objstore.OpGet] = append(result[objstore.OpGet], n) } for n := range cfg.iter { - result[opIter] = append(result[opIter], n) + result[objstore.OpIter] = append(result[objstore.OpIter], n) } for n := range cfg.exists { - result[opExists] = append(result[opExists], n) + result[objstore.OpExists] = append(result[objstore.OpExists], n) } for n := range cfg.getRange { - result[opGetRange] = append(result[opGetRange], n) + result[objstore.OpGetRange] = append(result[objstore.OpGetRange], n) } for n := range cfg.attributes { - result[opAttributes] = append(result[opAttributes], n) + result[objstore.OpAttributes] = append(result[objstore.OpAttributes], n) } return result } diff --git a/pkg/store/cache/caching_bucket_test.go b/pkg/store/cache/caching_bucket_test.go index ff5f36377b..59c0cc25e0 100644 --- a/pkg/store/cache/caching_bucket_test.go +++ b/pkg/store/cache/caching_bucket_test.go @@ -410,12 +410,12 @@ func TestCachedIter(t *testing.T) { } func verifyIter(t *testing.T, cb *CachingBucket, expectedFiles []string, expectedCache bool, cfgName string) { - hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opIter, cfgName))) + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpIter, cfgName))) col := iterCollector{} testutil.Ok(t, cb.Iter(context.Background(), "/", col.collect)) - hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opIter, cfgName))) + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpIter, cfgName))) sort.Strings(col.items) testutil.Equals(t, expectedFiles, col.items) @@ -487,11 +487,11 @@ func TestExistsCachingDisabled(t *testing.T) { func verifyExists(t *testing.T, cb *CachingBucket, file string, exists bool, fromCache bool, cfgName string) { t.Helper() - hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opExists, cfgName))) + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpExists, cfgName))) ok, err := cb.Exists(context.Background(), file) testutil.Ok(t, err) testutil.Equals(t, exists, ok) - hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opExists, cfgName))) + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpExists, cfgName))) if fromCache { testutil.Equals(t, 1, hitsAfter-hitsBefore) @@ -585,13 +585,13 @@ func TestGetPartialRead(t *testing.T) { } func verifyGet(t *testing.T, cb *CachingBucket, file string, expectedData []byte, cacheUsed bool, cfgName string) { - hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpGet, cfgName))) r, err := cb.Get(context.Background(), file) if expectedData == nil { testutil.Assert(t, cb.IsObjNotFoundErr(err)) - hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpGet, cfgName))) if cacheUsed { testutil.Equals(t, 1, hitsAfter-hitsBefore) } else { @@ -604,7 +604,7 @@ func verifyGet(t *testing.T, cb *CachingBucket, file string, expectedData []byte testutil.Ok(t, err) testutil.Equals(t, expectedData, data) - hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opGet, cfgName))) + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpGet, cfgName))) if cacheUsed { testutil.Equals(t, 1, hitsAfter-hitsBefore) } else { @@ -638,7 +638,7 @@ func TestAttributes(t *testing.T) { func verifyObjectAttrs(t *testing.T, cb *CachingBucket, file string, expectedLength int, cacheUsed bool, cfgName string) { t.Helper() - hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opAttributes, cfgName))) + hitsBefore := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpAttributes, cfgName))) attrs, err := cb.Attributes(context.Background(), file) if expectedLength < 0 { @@ -647,7 +647,7 @@ func verifyObjectAttrs(t *testing.T, cb *CachingBucket, file string, expectedLen testutil.Ok(t, err) testutil.Equals(t, int64(expectedLength), attrs.Size) - hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(opAttributes, cfgName))) + hitsAfter := int(promtest.ToFloat64(cb.operationHits.WithLabelValues(objstore.OpAttributes, cfgName))) if cacheUsed { testutil.Equals(t, 1, hitsAfter-hitsBefore) } else {