From d315347cf6c8e24a762efbd71d8451845edf73e9 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 17 Jul 2020 17:07:32 -0400 Subject: [PATCH] refactor replicate execution Signed-off-by: Ben Ye --- pkg/replicate/scheme.go | 82 +++++------------------------------------ 1 file changed, 9 insertions(+), 73 deletions(-) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 2a97c0a5e8c..034f8d99067 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -59,6 +59,7 @@ func NewBlockFilter( // Filter return true if block is non-compacted and matches selector. func (bf *BlockFilter) Filter(b *metadata.Meta) bool { + // TODO(bwplotka): Allow injecting custom labels as shipper does. if len(b.Thanos.Labels) == 0 { level.Error(bf.logger).Log("msg", "filtering block", "reason", "labels should not be empty") return false @@ -117,10 +118,6 @@ type replicationScheme struct { } type replicationMetrics struct { - originIterations prometheus.Counter - originMetaLoads prometheus.Counter - originPartialMeta prometheus.Counter - blocksAlreadyReplicated prometheus.Counter blocksReplicated prometheus.Counter objectsReplicated prometheus.Counter @@ -128,18 +125,6 @@ type replicationMetrics struct { func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { m := &replicationMetrics{ - originIterations: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_origin_iterations_total", - Help: "Total number of objects iterated over in the origin bucket.", - }), - originMetaLoads: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_origin_meta_loads_total", - Help: "Total number of meta.json reads in the origin bucket.", - }), - originPartialMeta: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_origin_partial_meta_reads_total", - Help: "Total number of partial meta reads encountered.", - }), blocksAlreadyReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_replicate_blocks_already_replicated_total", Help: "Total number of blocks skipped due to already being replicated.", @@ -183,45 +168,20 @@ func newReplicationScheme( func (rs *replicationScheme) execute(ctx context.Context) error { availableBlocks := []*metadata.Meta{} - level.Debug(rs.logger).Log("msg", "scanning blocks available blocks for replication") - - if err := rs.fromBkt.Iter(ctx, "", func(name string) error { - rs.metrics.originIterations.Inc() - - id, ok := thanosblock.IsBlockDir(name) - if !ok { - return nil - } - - rs.metrics.originMetaLoads.Inc() - - meta, metaNonExistentOrPartial, err := loadMeta(ctx, rs, id) - if metaNonExistentOrPartial { - // meta.json is the last file uploaded by a Thanos shipper, - // therefore a block may be partially present, but no meta.json - // file yet. If this is the case we skip that block for now. - rs.metrics.originPartialMeta.Inc() - level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String()) - return nil - } - if err != nil { - return errors.Wrapf(err, "load meta for block %v from origin bucket", id.String()) - } + metas, partials, err := rs.fetcher.Fetch(ctx) + if err != nil && metas == nil { + return err + } - if len(meta.Thanos.Labels) == 0 { - // TODO(bwplotka): Allow injecting custom labels as shipper does. - level.Info(rs.logger).Log("msg", "block meta without Thanos external labels set. This is not allowed. Skipping.", "block_uuid", id.String()) - return nil - } + for id, partialError := range partials { + level.Info(rs.logger).Log("msg", "failed to fetch block meta. Skipping.", "block_uuid", id.String(), "err", partialError) + } + for id, meta := range metas { if rs.blockFilter(meta) { level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String()) availableBlocks = append(availableBlocks, meta) } - - return nil - }); err != nil { - return errors.Wrap(err, "iterate over origin bucket") } // In order to prevent races in compactions by the target environment, we @@ -348,27 +308,3 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN return nil } - -// loadMeta loads the meta.json from the origin bucket and returns the meta -// struct as well as if failed, whether the failure was due to the meta.json -// not being present or partial. The distinction is important, as if missing or -// partial, this is just a temporary failure, as the block is still being -// uploaded to the origin bucket. -func loadMeta(ctx context.Context, rs *replicationScheme, id ulid.ULID) (*metadata.Meta, bool, error) { - metas, _, err := rs.fetcher.Fetch(ctx) - if err != nil { - switch errors.Cause(err) { - default: - return nil, false, errors.Wrap(err, "fetch meta") - case thanosblock.ErrorSyncMetaNotFound: - return nil, true, errors.Wrap(err, "fetch meta") - } - } - - m, ok := metas[id] - if !ok { - return nil, true, errors.Wrap(err, "fetch meta") - } - - return m, false, nil -}