Skip to content

Commit

Permalink
refactor replicate execution
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jul 17, 2020
1 parent 5dd97a7 commit d315347
Showing 1 changed file with 9 additions and 73 deletions.
82 changes: 9 additions & 73 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewBlockFilter(

// Filter return true if block is non-compacted and matches selector.
func (bf *BlockFilter) Filter(b *metadata.Meta) bool {
// TODO(bwplotka): Allow injecting custom labels as shipper does.
if len(b.Thanos.Labels) == 0 {
level.Error(bf.logger).Log("msg", "filtering block", "reason", "labels should not be empty")
return false
Expand Down Expand Up @@ -117,29 +118,13 @@ type replicationScheme struct {
}

type replicationMetrics struct {
originIterations prometheus.Counter
originMetaLoads prometheus.Counter
originPartialMeta prometheus.Counter

blocksAlreadyReplicated prometheus.Counter
blocksReplicated prometheus.Counter
objectsReplicated prometheus.Counter
}

func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
m := &replicationMetrics{
originIterations: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_origin_iterations_total",
Help: "Total number of objects iterated over in the origin bucket.",
}),
originMetaLoads: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_origin_meta_loads_total",
Help: "Total number of meta.json reads in the origin bucket.",
}),
originPartialMeta: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_origin_partial_meta_reads_total",
Help: "Total number of partial meta reads encountered.",
}),
blocksAlreadyReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_blocks_already_replicated_total",
Help: "Total number of blocks skipped due to already being replicated.",
Expand Down Expand Up @@ -183,45 +168,20 @@ func newReplicationScheme(
func (rs *replicationScheme) execute(ctx context.Context) error {
availableBlocks := []*metadata.Meta{}

level.Debug(rs.logger).Log("msg", "scanning blocks available blocks for replication")

if err := rs.fromBkt.Iter(ctx, "", func(name string) error {
rs.metrics.originIterations.Inc()

id, ok := thanosblock.IsBlockDir(name)
if !ok {
return nil
}

rs.metrics.originMetaLoads.Inc()

meta, metaNonExistentOrPartial, err := loadMeta(ctx, rs, id)
if metaNonExistentOrPartial {
// meta.json is the last file uploaded by a Thanos shipper,
// therefore a block may be partially present, but no meta.json
// file yet. If this is the case we skip that block for now.
rs.metrics.originPartialMeta.Inc()
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String())
return nil
}
if err != nil {
return errors.Wrapf(err, "load meta for block %v from origin bucket", id.String())
}
metas, partials, err := rs.fetcher.Fetch(ctx)
if err != nil && metas == nil {
return err
}

if len(meta.Thanos.Labels) == 0 {
// TODO(bwplotka): Allow injecting custom labels as shipper does.
level.Info(rs.logger).Log("msg", "block meta without Thanos external labels set. This is not allowed. Skipping.", "block_uuid", id.String())
return nil
}
for id, partialError := range partials {
level.Info(rs.logger).Log("msg", "failed to fetch block meta. Skipping.", "block_uuid", id.String(), "err", partialError)
}

for id, meta := range metas {
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}

return nil
}); err != nil {
return errors.Wrap(err, "iterate over origin bucket")
}

// In order to prevent races in compactions by the target environment, we
Expand Down Expand Up @@ -348,27 +308,3 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN

return nil
}

// loadMeta loads the meta.json from the origin bucket and returns the meta
// struct as well as if failed, whether the failure was due to the meta.json
// not being present or partial. The distinction is important, as if missing or
// partial, this is just a temporary failure, as the block is still being
// uploaded to the origin bucket.
func loadMeta(ctx context.Context, rs *replicationScheme, id ulid.ULID) (*metadata.Meta, bool, error) {
metas, _, err := rs.fetcher.Fetch(ctx)
if err != nil {
switch errors.Cause(err) {
default:
return nil, false, errors.Wrap(err, "fetch meta")
case thanosblock.ErrorSyncMetaNotFound:
return nil, true, errors.Wrap(err, "fetch meta")
}
}

m, ok := metas[id]
if !ok {
return nil, true, errors.Wrap(err, "fetch meta")
}

return m, false, nil
}

0 comments on commit d315347

Please sign in to comment.