Skip to content

Commit

Permalink
Added shipper capability to sync compacted blocks once.
Browse files Browse the repository at this point in the history
Changes:
* Use shipper metrics properly.
* Do not add compacted blocks to `uploaded` arr in thanos shipper meta if they are not uploaded.
* Added flag for syncing compacted blocks once.
* Enable working on different work dir.
* Enable using snapshot API to avoid race conditions.
* Ignore empty blocks.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Feb 8, 2019
1 parent 33b57f6 commit 81f9257
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 77 deletions.
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
cmd := app.Command(name, "Bucket utility commands")

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

registerBucketVerify(m, cmd, name, objStoreConfig)
registerBucketLs(m, cmd, name, objStoreConfig)
registerBucketInspect(m, cmd, name, objStoreConfig)
Expand Down
4 changes: 3 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ func runRule(
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
s.Sync(ctx)
if _, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err)
}

minTime, _, err := s.Timestamps()
if err != nil {
Expand Down
18 changes: 16 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

uploadCompacted := cmd.Flag("shipper.upload-compacted", "[Experimental] If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus.").Default("false").Hidden().Bool()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
Expand Down Expand Up @@ -78,6 +80,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri
objStoreConfig,
peer,
rl,
*uploadCompacted,
)
}
}
Expand All @@ -97,6 +100,7 @@ func runSidecar(
objStoreConfig *pathOrContent,
peer cluster.Peer,
reloader *reloader.Reloader,
uploadCompacted bool,
) error {
var m = &promMetadata{
promURL: promURL,
Expand Down Expand Up @@ -251,14 +255,24 @@ func runSidecar(
level.Error(logger).Log("err", err)
}

s := shipper.New(logger, nil, dataDir, bkt, m.Labels, metadata.SidecarSource)
ctx, cancel := context.WithCancel(context.Background())
var s *shipper.Shipper
if uploadCompacted {
s, err = shipper.NewMigrating(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, promURL)
if err != nil {
return errors.Wrap(err, "create shipper")
}
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
s.Sync(ctx)
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

minTime, _, err := s.Timestamps()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
return nil, errors.Errorf("is 'web.enable-admin-api' flag enabled? got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

var d struct {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (m *modelBool) UnmarshalJSON(b []byte) error {
return nil
}

// ConfiguredFlags some configured flags from /api/v1/status/flags Prometheus endpoint.
// ConfiguredFlags returns configured flags from /api/v1/status/flags Prometheus endpoint.
// Added to Prometheus from v2.2.
func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Flags, error) {
u := *base
Expand Down
Loading

0 comments on commit 81f9257

Please sign in to comment.