Skip to content

Commit

Permalink
bucket: Improved bucket flag handling when no store is specifed, simp…
Browse files Browse the repository at this point in the history
…lified factory anf bucket commands code.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jan 30, 2019
1 parent 5e5e353 commit 7b72b6d
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Fixed

- [#745](https://github.com/improbable-eng/thanos/pull/745) - Fixed race conditions and edge cases for Thanos Querier fanout logic.
- [#396](https://github.com/improbable-eng/thanos/issues/396) - Fixed sidecar missing proxying samples if Prometheus result for single series was longer than 2^16
- [#649](https://github.com/improbable-eng/thanos/issues/649) - Fixed store label values api to add also external label values.
- [#708](https://github.com/improbable-eng/thanos/issues/708) - `"X-Amz-Acl": "bucket-owner-full-control"` metadata for s3 upload operation is no longer set by default which was breaking some providers handled by minio client.

Expand Down
74 changes: 44 additions & 30 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/verifier"
Expand Down Expand Up @@ -46,44 +47,53 @@ var (
)

func registerBucket(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "Inspect metric data in an object storage bucket")
cmd := app.Command(name, "Bucket utility commands")

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreBackupConfig := regCommonObjStoreFlags(cmd, "-backup")
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

// Verify command.
verify := cmd.Command("verify", "Verify all blocks in the bucket against specified issues")
verifyRepair := verify.Flag("repair", "Attempt to repair blocks for which issues were detected").
registerBucketVerify(m, cmd, name, objStoreConfig)
registerBucketLs(m, cmd, name, objStoreConfig)
registerBucketInspect(m, cmd, name, objStoreConfig)
return
}

func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *pathOrContent) {
cmd := root.Command("verify", "Verify all blocks in the bucket against specified issues")
objStoreBackupConfig := regCommonObjStoreFlags(cmd, "-backup", false, "Used for repair logic to backup blocks before removal.")
repair := cmd.Flag("repair", "Attempt to repair blocks for which issues were detected").
Short('r').Default("false").Bool()
verifyIssues := verify.Flag("issues", fmt.Sprintf("Issues to verify (and optionally repair). Possible values: %v", allIssues())).
issuesToVerify := cmd.Flag("issues", fmt.Sprintf("Issues to verify (and optionally repair). Possible values: %v", allIssues())).
Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings()
verifyIDWhitelist := verify.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
idWhitelist := cmd.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
"If none is specified, all blocks will be verified. Repeated field").Strings()
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
bucketConfig, err := objStoreConfig.Content()
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, bucketConfig, reg, name)
bkt, err := client.NewBucket(logger, confContentYaml, reg, name)
if err != nil {
return err
}
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

backupBucketConfig, err := objStoreBackupConfig.Content()
backupconfContentYaml, err := objStoreBackupConfig.Content()
if err != nil {
return err
}

backupBkt, err := client.NewBucket(logger, backupBucketConfig, reg, name)
if err == client.ErrNotFound {
if *verifyRepair {
var backupBkt objstore.Bucket
if len(backupconfContentYaml) == 0 {
if *repair {
return errors.Wrap(err, "repair is specified, so backup client is required")
}
} else if err != nil {
return err
} else {
backupBkt, err = client.NewBucket(logger, backupconfContentYaml, reg, name)
if err != nil {
return err
}

defer runutil.CloseWithLogOnErr(logger, backupBkt, "backup bucket client")
}

Expand All @@ -96,24 +106,24 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
issues []verifier.Issue
)

for _, i := range *verifyIssues {
for _, i := range *issuesToVerify {
issueFn, ok := issuesMap[i]
if !ok {
return errors.Errorf("no such issue name %s", i)
}
issues = append(issues, issueFn)
}

if *verifyRepair {
if *repair {
v = verifier.NewWithRepair(logger, bkt, backupBkt, issues)
} else {
v = verifier.New(logger, bkt, issues)
}

var idMatcher func(ulid.ULID) bool = nil
if len(*verifyIDWhitelist) > 0 {
if len(*idWhitelist) > 0 {
whilelistIDs := map[string]struct{}{}
for _, bid := range *verifyIDWhitelist {
for _, bid := range *idWhitelist {
id, err := ulid.Parse(bid)
if err != nil {
return errors.Wrap(err, "invalid ULID found in --id-whitelist flag")
Expand All @@ -131,17 +141,19 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin

return v.Verify(ctx, idMatcher)
}
}

ls := cmd.Command("ls", "List all blocks in the bucket")
lsOutput := ls.Flag("output", "Format in which to print each block's information. May be 'json' or custom template.").
func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *pathOrContent) {
cmd := root.Command("ls", "List all blocks in the bucket")
output := cmd.Flag("output", "Optional format in which to print each block's information. Options are 'json', 'wide' or a custom template.").
Short('o').Default("").String()
m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
bucketConfig, err := objStoreConfig.Content()
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, bucketConfig, reg, name)
bkt, err := client.NewBucket(logger, confContentYaml, reg, name)
if err != nil {
return err
}
Expand All @@ -155,7 +167,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
defer cancel()

var (
format = *lsOutput
format = *output
printBlock func(id ulid.ULID) error
)

Expand Down Expand Up @@ -220,11 +232,13 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
return printBlock(id)
})
}
}

inspect := cmd.Command("inspect", "Inspect all blocks in the bucket")
selector := inspect.Flag("selector", "Selects blocks based on label, e.g. '-l key1=\"value1\" -l key2=\"value2\"'. All key value pairs must match.").Short('l').
func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *pathOrContent) {
cmd := root.Command("inspect", "Inspect all blocks in the bucket in detailed, table-like way")
selector := cmd.Flag("selector", "Selects blocks based on label, e.g. '-l key1=\"value1\" -l key2=\"value2\"'. All key value pairs must match.").Short('l').
PlaceHolder("<name>=\"<value>\"").Strings()
sortBy := inspect.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by UNTIL'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'UNTIL' value.").
sortBy := cmd.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by UNTIL'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'UNTIL' value.").
Default("FROM", "UNTIL").Enums(inspectColumns...)

m[name+" inspect"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
Expand All @@ -235,12 +249,12 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
return fmt.Errorf("error parsing selector flag: %v", err)
}

bucketConfig, err := objStoreConfig.Content()
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, bucketConfig, reg, name)
bkt, err := client.NewBucket(logger, confContentYaml, reg, name)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process compactions.").
Default("./data").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed.").
Default("30m"))
Expand Down Expand Up @@ -140,12 +140,12 @@ func runCompact(
reg.MustRegister(halted)
reg.MustRegister(retried)

bucketConfig, err := objStoreConfig.Content()
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, bucketConfig, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application, name s
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runDownsample(g, logger, reg, *dataDir, objStoreConfig, name)
Expand All @@ -47,12 +47,12 @@ func runDownsample(
objStoreConfig *pathOrContent,
component string,
) error {
bucketConfig, err := objStoreConfig.Content()
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, bucketConfig, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
if err != nil {
return err
}
Expand Down
42 changes: 28 additions & 14 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,42 +134,56 @@ func modelDuration(flags *kingpin.FlagClause) *model.Duration {
}

type pathOrContent struct {
name string
fileFlagName string
contentFlagName string

path *string
content *string
required bool
path *string
content *string
}

// Content returns content of the file. Flag that specifies path has priority.
// It returns error if the content is empty and required flag is set to true.
func (p *pathOrContent) Content() ([]byte, error) {
if len(*p.path) > 0 && len(*p.content) > 0 {
return nil, errors.Errorf("Both file and content are set for %s", p.name)
return nil, errors.Errorf("Both %s and %s flags set.", p.fileFlagName, p.contentFlagName)
}

var content []byte
if len(*p.path) > 0 {
c, err := ioutil.ReadFile(*p.path)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("loading YAML file %s for %s", *p.path, p.name))
return nil, errors.Wrapf(err, "loading YAML file %s for %s", *p.path, p.fileFlagName)
}
return c, nil
content = c
} else {
content = []byte(*p.content)
}

if len(*p.content) > 0 {
return []byte(*p.content), nil
if len(content) == 0 && p.required {
return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", p.fileFlagName, p.contentFlagName)
}

return nil, nil
return content, nil
}

func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string) *pathOrContent {
func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool, extraDesc ...string) *pathOrContent {
fileFlagName := fmt.Sprintf("objstore%s.config-file", suffix)
bucketConfFile := cmd.Flag(fileFlagName, fmt.Sprintf("Path to YAML file that contains object store%s configuration.", suffix)).
PlaceHolder("<bucket.config-yaml-path>").String()
contentFlagName := fmt.Sprintf("objstore%s.config", suffix)

bucketConf := cmd.Flag(fmt.Sprintf("objstore%s.config", suffix), fmt.Sprintf("Alternative to '%s' flag. Object store%s configuration in YAML.", fileFlagName, suffix)).
help := fmt.Sprintf("Path to YAML file that contains object store%s configuration.", suffix)
help = strings.Join(append([]string{help}, extraDesc...), " ")
bucketConfFile := cmd.Flag(fileFlagName, help).PlaceHolder("<bucket.config-yaml-path>").String()

help = fmt.Sprintf("Alternative to '%s' flag. Object store%s configuration in YAML.", fileFlagName, suffix)
help = strings.Join(append([]string{help}, extraDesc...), " ")
bucketConf := cmd.Flag(contentFlagName, help).
PlaceHolder("<bucket.config-yaml>").String()

return &pathOrContent{
name: fmt.Sprintf("objstore%s.config", suffix),
fileFlagName: fileFlagName,
contentFlagName: contentFlagName,
required: required,

path: bucketConfFile,
content: bucketConf,
Expand Down
22 changes: 11 additions & 11 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the UI query web interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

queries := cmd.Flag("query", "Addresses of statically configured query API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect query API servers through respective DNS lookups.").
PlaceHolder("<query>").Strings()
Expand Down Expand Up @@ -575,25 +575,25 @@ func runRule(
})
}

var uploads = true

bucketConfig, err := objStoreConfig.Content()
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, bucketConfig, reg, component)
if err != nil && err != client.ErrNotFound {
return err
}

if err == client.ErrNotFound {
var uploads = true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
uploads = false
}

if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
Expand Down
Loading

0 comments on commit 7b72b6d

Please sign in to comment.