-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor replicate execution method to not iterate the origin bucket #2906
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -117,29 +117,13 @@ type replicationScheme struct { | |
} | ||
|
||
type replicationMetrics struct { | ||
originIterations prometheus.Counter | ||
originMetaLoads prometheus.Counter | ||
originPartialMeta prometheus.Counter | ||
|
||
blocksAlreadyReplicated prometheus.Counter | ||
blocksReplicated prometheus.Counter | ||
objectsReplicated prometheus.Counter | ||
} | ||
|
||
func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { | ||
m := &replicationMetrics{ | ||
originIterations: promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
Name: "thanos_replicate_origin_iterations_total", | ||
Help: "Total number of objects iterated over in the origin bucket.", | ||
}), | ||
originMetaLoads: promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Name: "thanos_replicate_origin_meta_loads_total", | ||
Help: "Total number of meta.json reads in the origin bucket.", | ||
}), | ||
originPartialMeta: promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
Name: "thanos_replicate_origin_partial_meta_reads_total", | ||
Help: "Total number of partial meta reads encountered.", | ||
}), | ||
blocksAlreadyReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
Name: "thanos_replicate_blocks_already_replicated_total", | ||
Help: "Total number of blocks skipped due to already being replicated.", | ||
|
@@ -183,45 +167,20 @@ func newReplicationScheme( | |
func (rs *replicationScheme) execute(ctx context.Context) error { | ||
availableBlocks := []*metadata.Meta{} | ||
|
||
level.Debug(rs.logger).Log("msg", "scanning blocks available blocks for replication") | ||
|
||
if err := rs.fromBkt.Iter(ctx, "", func(name string) error { | ||
rs.metrics.originIterations.Inc() | ||
|
||
id, ok := thanosblock.IsBlockDir(name) | ||
if !ok { | ||
return nil | ||
} | ||
|
||
rs.metrics.originMetaLoads.Inc() | ||
|
||
meta, metaNonExistentOrPartial, err := loadMeta(ctx, rs, id) | ||
if metaNonExistentOrPartial { | ||
// meta.json is the last file uploaded by a Thanos shipper, | ||
// therefore a block may be partially present, but no meta.json | ||
// file yet. If this is the case we skip that block for now. | ||
rs.metrics.originPartialMeta.Inc() | ||
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String()) | ||
return nil | ||
} | ||
if err != nil { | ||
return errors.Wrapf(err, "load meta for block %v from origin bucket", id.String()) | ||
} | ||
metas, partials, err := rs.fetcher.Fetch(ctx) | ||
if err != nil { | ||
return err | ||
kakkoyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we should log the error if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is good to log the error here, but kind of duplicate. I just check the code, only here returns metas and error at the same time. In this case, partials != nil, so we will log the partial meta errors later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm the
On other hand, shouldn't replicate FAIL on any error spotted? Even if we failed to read one block? Sometimes we heavily rely on replication status so this has to be heavily tested and controlled, ideally with failed metric being somewhere (: |
||
|
||
if len(meta.Thanos.Labels) == 0 { | ||
// TODO(bwplotka): Allow injecting custom labels as shipper does. | ||
level.Info(rs.logger).Log("msg", "block meta without Thanos external labels set. This is not allowed. Skipping.", "block_uuid", id.String()) | ||
return nil | ||
} | ||
for id := range partials { | ||
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String()) | ||
} | ||
|
||
for id, meta := range metas { | ||
if rs.blockFilter(meta) { | ||
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String()) | ||
availableBlocks = append(availableBlocks, meta) | ||
} | ||
|
||
return nil | ||
}); err != nil { | ||
return errors.Wrap(err, "iterate over origin bucket") | ||
} | ||
|
||
// In order to prevent races in compactions by the target environment, we | ||
|
@@ -266,6 +225,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli | |
return errors.Wrap(err, "get meta file from target bucket") | ||
} | ||
|
||
// TODO(bwplotka): Allow injecting custom labels as shipper does. | ||
originMetaFileContent, err := ioutil.ReadAll(originMetaFile) | ||
if err != nil { | ||
return errors.Wrap(err, "read origin meta file") | ||
|
@@ -348,27 +308,3 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN | |
|
||
return nil | ||
} | ||
|
||
// loadMeta loads the meta.json from the origin bucket and returns the meta | ||
// struct as well as if failed, whether the failure was due to the meta.json | ||
// not being present or partial. The distinction is important, as if missing or | ||
// partial, this is just a temporary failure, as the block is still being | ||
// uploaded to the origin bucket. | ||
func loadMeta(ctx context.Context, rs *replicationScheme, id ulid.ULID) (*metadata.Meta, bool, error) { | ||
metas, _, err := rs.fetcher.Fetch(ctx) | ||
if err != nil { | ||
switch errors.Cause(err) { | ||
default: | ||
return nil, false, errors.Wrap(err, "fetch meta") | ||
case thanosblock.ErrorSyncMetaNotFound: | ||
return nil, true, errors.Wrap(err, "fetch meta") | ||
} | ||
} | ||
|
||
m, ok := metas[id] | ||
if !ok { | ||
return nil, true, errors.Wrap(err, "fetch meta") | ||
} | ||
|
||
return m, false, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically not need to mention refactor, just metric changes (: