diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index dd462d6b51..221901f7e5 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -95,6 +95,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). Default("20").Int() + groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group."). + Default("1").Int() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runCompact(g, logger, reg, *httpAddr, @@ -112,6 +115,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *disableDownsampling, *maxCompactionLevel, *blockSyncConcurrency, + *groupCompactConcurrency, ) } } @@ -131,6 +135,7 @@ func runCompact( disableDownsampling bool, maxCompactionLevel int, blockSyncConcurrency int, + groupCompactConcurrency int, ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", @@ -206,7 +211,7 @@ func runCompact( ctx, cancel := context.WithCancel(context.Background()) f := func() error { - if err := compactor.Compact(ctx); err != nil { + if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil { return errors.Wrap(err, "compaction failed") } level.Info(logger).Log("msg", "compaction iterations done") diff --git a/docs/components/compact.md b/docs/components/compact.md index 2eb6a99824..668c121d08 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -70,5 +70,7 @@ Flags: --block-sync-concurrency=20 Number of goroutines to use when syncing block metadata from object storage. + --group-compact-concurrency=1 + Number of goroutines to use when compacting group. ``` diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ca05adfb4a..0ed9582b37 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -886,7 +886,7 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp } // Compact runs compaction over bucket. -func (c *BucketCompactor) Compact(ctx context.Context) error { +func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error { // Loop over bucket and compact until there's no work left. for { // Clean up the compaction temporary directory at the beginning of every compaction loop. @@ -913,22 +913,42 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { return errors.Wrap(err, "build compaction groups") } finishedAllGroups := true + var wg sync.WaitGroup + errChan := make(chan error, len(groups)) + groupChan := make(chan struct{}, groupCompactConcurrency) + defer close(groupChan) + for i := 0; i < groupCompactConcurrency; i++ { + groupChan <- struct{}{} + } for _, g := range groups { - shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) - if err == nil { - if shouldRerunGroup { - finishedAllGroups = false + <-groupChan + wg.Add(1) + go func(g *Group) { + defer func() { + wg.Done() + groupChan <- struct{}{} + }() + shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) + if err == nil { + if shouldRerunGroup { + finishedAllGroups = false + } + return } - continue - } - if IsIssue347Error(err) { - if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { - finishedAllGroups = false - continue + if IsIssue347Error(err) { + if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { + finishedAllGroups = false + return + } } - } - return errors.Wrap(err, "compaction") + errChan <- errors.Wrap(err, "compaction") + }(g) + } + wg.Wait() + close(errChan) + if err := <-errChan; err != nil { + return err } if finishedAllGroups { break