Skip to content

Commit

Permalink
Added shipper capability to sync compacted blocks once.
Browse files Browse the repository at this point in the history
Changes:
* Use shipper metrics properly.
* Do not add compacted blocks to `uploaded` arr in thanos shipper meta if they are not uploaded.
* Added flag for syncing compacted blocks once.
* Enable working on different work dir.
* Enable using snapshot API to avoid race conditions.
* Ignore empty blocks.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Feb 5, 2019
1 parent e3d06e1 commit bbdbe00
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 74 deletions.
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
cmd := app.Command(name, "Bucket utility commands")

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

registerBucketVerify(m, cmd, name, objStoreConfig)
registerBucketLs(m, cmd, name, objStoreConfig)
registerBucketInspect(m, cmd, name, objStoreConfig)
Expand Down
4 changes: 3 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ func runRule(
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
s.Sync(ctx)
if _, err := s.SyncNonCompacted(ctx); err != nil {
level.Warn(logger).Log("err", err)
}

minTime, _, err := s.Timestamps()
if err != nil {
Expand Down
18 changes: 16 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

uploadCompacted := cmd.Flag("shipper.upload-compacted-once", "[Experimental] If true sidecar will try to upload old compacted blocks as well (just once). Useful for migration purposes.").Hidden().Bool()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
Expand Down Expand Up @@ -78,6 +80,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri
peer,
rl,
name,
*uploadCompacted,
)
}
}
Expand All @@ -98,6 +101,7 @@ func runSidecar(
peer cluster.Peer,
reloader *reloader.Reloader,
component string,
uploadCompacted bool,
) error {
var m = &promMetadata{
promURL: promURL,
Expand Down Expand Up @@ -252,14 +256,24 @@ func runSidecar(
level.Error(logger).Log("err", err)
}

s := shipper.New(logger, nil, dataDir, bkt, m.Labels, metadata.SidecarSource)
ctx, cancel := context.WithCancel(context.Background())
var s *shipper.Shipper
if uploadCompacted {
s, err = shipper.NewMigrating(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, promURL)
if err != nil {
return errors.Wrap(err, "create shipper")
}
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
s.Sync(ctx)
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

minTime, _, err := s.Timestamps()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions docs/components/bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ Flags:
store configuration in YAML.

Subcommands:
bucket sync [<flags>]
[Experimental] Synchronise all blocks with remote bucket. NOTE: Currently NO
compactor must be running in the same time.

bucket verify [<flags>]
Verify all blocks in the bucket against specified issues

Expand Down
1 change: 1 addition & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
CompactorRepairSource SourceType = "compactor.repair"
RulerSource SourceType = "ruler"
BucketRepairSource SourceType = "bucket.repair"
BucketSyncSource SourceType = "bucket.sync"
TestSource SourceType = "test"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
return nil, errors.Errorf("is 'web.enable-admin-api' flag enabled? got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

var d struct {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (m *modelBool) UnmarshalJSON(b []byte) error {
return nil
}

// ConfiguredFlags some configured flags from /api/v1/status/flags Prometheus endpoint.
// ConfiguredFlags returns configured flags from /api/v1/status/flags Prometheus endpoint.
// Added to Prometheus from v2.2.
func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Flags, error) {
u := *base
Expand Down
Loading

0 comments on commit bbdbe00

Please sign in to comment.