From 609af48275d2320160fb12d6d1087e5178457b4c Mon Sep 17 00:00:00 2001 From: wangyaqiang1 Date: Thu, 14 Mar 2019 18:48:31 +0800 Subject: [PATCH 1/4] compact: add concurrency to group compact --- pkg/compact/compact.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ca05adfb4a..6190c57197 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -913,22 +913,33 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { return errors.Wrap(err, "build compaction groups") } finishedAllGroups := true + var wg sync.WaitGroup + errChan := make(chan error, len(groups)) for _, g := range groups { - shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) - if err == nil { - if shouldRerunGroup { - finishedAllGroups = false + wg.Add(1) + go func(g *Group) { + defer wg.Done() + shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) + if err == nil { + if shouldRerunGroup { + finishedAllGroups = false + } + return } - continue - } - if IsIssue347Error(err) { - if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { - finishedAllGroups = false - continue + if IsIssue347Error(err) { + if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { + finishedAllGroups = false + return + } } - } - return errors.Wrap(err, "compaction") + errChan <- errors.Wrap(err, "compaction") + }(g) + } + wg.Wait() + close(errChan) + if err := <-errChan; err != nil { + return err } if finishedAllGroups { break From ae90bf09d4ee8ca550ab04bdba4a00283393db28 Mon Sep 17 00:00:00 2001 From: wangyaqiang1 Date: Wed, 20 Mar 2019 18:33:44 +0800 Subject: [PATCH 2/4] add flag to controll the number of goroutines to use when compacting group --- cmd/thanos/compact.go | 7 ++++++- pkg/compact/compact.go | 14 +++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index dd462d6b51..221901f7e5 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -95,6 +95,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). Default("20").Int() + groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group."). + Default("1").Int() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runCompact(g, logger, reg, *httpAddr, @@ -112,6 +115,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *disableDownsampling, *maxCompactionLevel, *blockSyncConcurrency, + *groupCompactConcurrency, ) } } @@ -131,6 +135,7 @@ func runCompact( disableDownsampling bool, maxCompactionLevel int, blockSyncConcurrency int, + groupCompactConcurrency int, ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", @@ -206,7 +211,7 @@ func runCompact( ctx, cancel := context.WithCancel(context.Background()) f := func() error { - if err := compactor.Compact(ctx); err != nil { + if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil { return errors.Wrap(err, "compaction failed") } level.Info(logger).Log("msg", "compaction iterations done") diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 6190c57197..dcebcbc28a 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -886,7 +886,7 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp } // Compact runs compaction over bucket. -func (c *BucketCompactor) Compact(ctx context.Context) error { +func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error { // Loop over bucket and compact until there's no work left. for { // Clean up the compaction temporary directory at the beginning of every compaction loop. @@ -915,10 +915,18 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { finishedAllGroups := true var wg sync.WaitGroup errChan := make(chan error, len(groups)) + groupChan := make(chan struct{}, groupCompactConcurrency) + defer close(groupChan) + for i := 0; i < groupCompactConcurrency; i++ { + groupChan <- struct{}{} + } for _, g := range groups { - wg.Add(1) + <-groupChan go func(g *Group) { - defer wg.Done() + defer func() { + wg.Done() + groupChan <- struct{}{} + }() shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) if err == nil { if shouldRerunGroup { From d3aa040ff07ce210b4bf567541c6ac2a795959cc Mon Sep 17 00:00:00 2001 From: wangyaqiang1 Date: Thu, 21 Mar 2019 11:45:27 +0800 Subject: [PATCH 3/4] update compact.md for group-compact-concurrency --- docs/components/compact.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/components/compact.md b/docs/components/compact.md index 2eb6a99824..668c121d08 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -70,5 +70,7 @@ Flags: --block-sync-concurrency=20 Number of goroutines to use when syncing block metadata from object storage. + --group-compact-concurrency=1 + Number of goroutines to use when compacting group. ``` From 4d802ce6a74f728031b12f124e4953b59fb681d6 Mon Sep 17 00:00:00 2001 From: wangyaqiang1 Date: Thu, 21 Mar 2019 11:54:35 +0800 Subject: [PATCH 4/4] fixed: miss wg.Add() --- pkg/compact/compact.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index dcebcbc28a..0ed9582b37 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -922,6 +922,7 @@ func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency i } for _, g := range groups { <-groupChan + wg.Add(1) go func(g *Group) { defer func() { wg.Done()