From 65237673b79c29f0850ea40f21542d6b77329d9f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 17 Jul 2020 17:07:32 -0400 Subject: [PATCH 1/3] Refactor replicate execution Signed-off-by: Ben Ye --- CHANGELOG.md | 6 ++ examples/dashboards/bucket-replicate.json | 12 +-- mixin/dashboards/bucket-replicate.libsonnet | 7 +- pkg/replicate/scheme.go | 82 +++------------------ 4 files changed, 20 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca49a6d43e..4364f7868c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,12 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3331](https://github.com/thanos-io/thanos/pull/3331) Disable Azure blob exception logging - [#3341](https://github.com/thanos-io/thanos/pull/3341) Disable Azure blob syslog exception logging +### Changed + +- [#2906](https://github.com/thanos-io/thanos/pull/2906) Tools: Refactor Bucket replicate execution. Removed all `thanos_replicate_origin_.*` metrics. + - `thanos_replicate_origin_meta_loads_total` can be replaced by `blocks_meta_synced{state="loaded"}`. + - `thanos_replicate_origin_partial_meta_reads_total` can be replaced by `blocks_meta_synced{state="failed"}`. + ## [v0.16.0](https://github.com/thanos-io/thanos/releases) - 2020.10.26 Highlights: diff --git a/examples/dashboards/bucket-replicate.json b/examples/dashboards/bucket-replicate.json index 22f51cd477..b45750761e 100644 --- a/examples/dashboards/bucket-replicate.json +++ b/examples/dashboards/bucket-replicate.json @@ -305,15 +305,7 @@ "steppedLine": false, "targets": [ { - "expr": "sum(rate(thanos_replicate_origin_iterations_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", - "format": "time_series", - "intervalFactor": 2, - "legendFormat": "iterations", - "legendLink": null, - "step": 10 - }, - { - "expr": "sum(rate(thanos_replicate_origin_meta_loads_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "expr": "sum(rate(blocks_meta_synced{state=\"loaded\",namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", "format": "time_series", "intervalFactor": 2, "legendFormat": "meta loads", @@ -321,7 +313,7 @@ "step": 10 }, { - "expr": "sum(rate(thanos_replicate_origin_partial_meta_reads_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", + "expr": "sum(rate(blocks_meta_synced{state=\"failed\",namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))", "format": "time_series", "intervalFactor": 2, "legendFormat": "partial meta reads", diff --git a/mixin/dashboards/bucket-replicate.libsonnet b/mixin/dashboards/bucket-replicate.libsonnet index a0a668baa9..1657c98cc8 100644 --- a/mixin/dashboards/bucket-replicate.libsonnet +++ b/mixin/dashboards/bucket-replicate.libsonnet @@ -39,14 +39,13 @@ local g = import '../lib/thanos-grafana-builder/builder.libsonnet'; g.panel('Metrics') + g.queryPanel( [ - 'sum(rate(thanos_replicate_origin_iterations_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, - 'sum(rate(thanos_replicate_origin_meta_loads_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, - 'sum(rate(thanos_replicate_origin_partial_meta_reads_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, + 'sum(rate(blocks_meta_synced{state="loaded",namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, + 'sum(rate(blocks_meta_synced{state="failed",namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, 'sum(rate(thanos_replicate_blocks_already_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, 'sum(rate(thanos_replicate_blocks_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, 'sum(rate(thanos_replicate_objects_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate, ], - ['iterations', 'meta loads', 'partial meta reads', 'already replicated blocks', 'replicated blocks', 'replicated objects'] + ['meta loads', 'partial meta reads', 'already replicated blocks', 'replicated blocks', 'replicated objects'] ) ) ) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 2a97c0a5e8..86e65040dc 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -117,10 +117,6 @@ type replicationScheme struct { } type replicationMetrics struct { - originIterations prometheus.Counter - originMetaLoads prometheus.Counter - originPartialMeta prometheus.Counter - blocksAlreadyReplicated prometheus.Counter blocksReplicated prometheus.Counter objectsReplicated prometheus.Counter @@ -128,18 +124,6 @@ type replicationMetrics struct { func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { m := &replicationMetrics{ - originIterations: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_origin_iterations_total", - Help: "Total number of objects iterated over in the origin bucket.", - }), - originMetaLoads: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_origin_meta_loads_total", - Help: "Total number of meta.json reads in the origin bucket.", - }), - originPartialMeta: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_origin_partial_meta_reads_total", - Help: "Total number of partial meta reads encountered.", - }), blocksAlreadyReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_replicate_blocks_already_replicated_total", Help: "Total number of blocks skipped due to already being replicated.", @@ -183,45 +167,20 @@ func newReplicationScheme( func (rs *replicationScheme) execute(ctx context.Context) error { availableBlocks := []*metadata.Meta{} - level.Debug(rs.logger).Log("msg", "scanning blocks available blocks for replication") - - if err := rs.fromBkt.Iter(ctx, "", func(name string) error { - rs.metrics.originIterations.Inc() - - id, ok := thanosblock.IsBlockDir(name) - if !ok { - return nil - } - - rs.metrics.originMetaLoads.Inc() - - meta, metaNonExistentOrPartial, err := loadMeta(ctx, rs, id) - if metaNonExistentOrPartial { - // meta.json is the last file uploaded by a Thanos shipper, - // therefore a block may be partially present, but no meta.json - // file yet. If this is the case we skip that block for now. - rs.metrics.originPartialMeta.Inc() - level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String()) - return nil - } - if err != nil { - return errors.Wrapf(err, "load meta for block %v from origin bucket", id.String()) - } + metas, partials, err := rs.fetcher.Fetch(ctx) + if err != nil && metas == nil { + return err + } - if len(meta.Thanos.Labels) == 0 { - // TODO(bwplotka): Allow injecting custom labels as shipper does. - level.Info(rs.logger).Log("msg", "block meta without Thanos external labels set. This is not allowed. Skipping.", "block_uuid", id.String()) - return nil - } + for id, partialError := range partials { + level.Info(rs.logger).Log("msg", "failed to fetch block meta. Skipping.", "block_uuid", id.String(), "err", partialError) + } + for id, meta := range metas { if rs.blockFilter(meta) { level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String()) availableBlocks = append(availableBlocks, meta) } - - return nil - }); err != nil { - return errors.Wrap(err, "iterate over origin bucket") } // In order to prevent races in compactions by the target environment, we @@ -266,6 +225,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli return errors.Wrap(err, "get meta file from target bucket") } + // TODO(bwplotka): Allow injecting custom labels as shipper does. originMetaFileContent, err := ioutil.ReadAll(originMetaFile) if err != nil { return errors.Wrap(err, "read origin meta file") @@ -348,27 +308,3 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN return nil } - -// loadMeta loads the meta.json from the origin bucket and returns the meta -// struct as well as if failed, whether the failure was due to the meta.json -// not being present or partial. The distinction is important, as if missing or -// partial, this is just a temporary failure, as the block is still being -// uploaded to the origin bucket. -func loadMeta(ctx context.Context, rs *replicationScheme, id ulid.ULID) (*metadata.Meta, bool, error) { - metas, _, err := rs.fetcher.Fetch(ctx) - if err != nil { - switch errors.Cause(err) { - default: - return nil, false, errors.Wrap(err, "fetch meta") - case thanosblock.ErrorSyncMetaNotFound: - return nil, true, errors.Wrap(err, "fetch meta") - } - } - - m, ok := metas[id] - if !ok { - return nil, true, errors.Wrap(err, "fetch meta") - } - - return m, false, nil -} From 03cb6965cdea6ee3aba4c6cef6543ad27ef0720f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 13 Sep 2020 15:37:32 -0400 Subject: [PATCH 2/3] return error when fetcher gets an error Signed-off-by: Ben Ye --- pkg/replicate/scheme.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 86e65040dc..364c3af1d3 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -168,7 +168,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error { availableBlocks := []*metadata.Meta{} metas, partials, err := rs.fetcher.Fetch(ctx) - if err != nil && metas == nil { + if err != nil { return err } From f1079bed1ab178ccf8a1728214a6da181c92b87a Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 30 Oct 2020 15:57:07 -0400 Subject: [PATCH 3/3] address feedback Signed-off-by: Ben Ye --- pkg/replicate/scheme.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 364c3af1d3..893c89865c 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -172,8 +172,8 @@ func (rs *replicationScheme) execute(ctx context.Context) error { return err } - for id, partialError := range partials { - level.Info(rs.logger).Log("msg", "failed to fetch block meta. Skipping.", "block_uuid", id.String(), "err", partialError) + for id := range partials { + level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String()) } for id, meta := range metas {