Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: add concurrency to group compact #898

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group.").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group.").
groupCompactConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting group.").

Default("1").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -112,6 +115,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
*groupCompactConcurrency,
)
}
}
Expand All @@ -131,6 +135,7 @@ func runCompact(
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
groupCompactConcurrency int,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -206,7 +211,7 @@ func runCompact(

ctx, cancel := context.WithCancel(context.Background())
f := func() error {
if err := compactor.Compact(ctx); err != nil {
if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil {
if err := compactor.Compact(ctx, concurrency); err != nil {

return errors.Wrap(err, "compaction failed")
}
level.Info(logger).Log("msg", "compaction iterations done")
Expand Down
2 changes: 2 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,7 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.
--group-compact-concurrency=1
Number of goroutines to use when compacting group.

```
46 changes: 33 additions & 13 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp
}

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) error {
func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just concurrency would be enough.

Suggested change
func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error {
func (c *BucketCompactor) Compact(ctx context.Context, concurrency int) error {

// Loop over bucket and compact until there's no work left.
for {
// Clean up the compaction temporary directory at the beginning of every compaction loop.
Expand All @@ -913,22 +913,42 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
return errors.Wrap(err, "build compaction groups")
}
finishedAllGroups := true
var wg sync.WaitGroup
errChan := make(chan error, len(groups))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need something more similiar to this https://github.com/improbable-eng/thanos/pull/887/files still.
This is for consistency and readability reasons. so:

  1. Run concurrency number of go routines BEFORE for{ loop (891 line)
  2. Do the whole sync Iterate over groups
   for _, g := range groups {
````. 
3. Send `g` via some channel with 0 queue.
4. In go routines do the g (`g.Compact`) and mark `finishedAllGroups` to false if there are still some group to run. Make sure to make it `atomic.Value` or use `sync.Mutex`. 
5. After ` for _, g := range groups {` close channel and wait for all groups to finish. If `finishedAllGroups == true` return otherwise continue

groupChan := make(chan struct{}, groupCompactConcurrency)
defer close(groupChan)
for i := 0; i < groupCompactConcurrency; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am reviewing on mobile, so might miss something, but I don't get this loop. What is the purpose of it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I get it, but seem my comment on line 917

groupChan <- struct{}{}
}
for _, g := range groups {
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
finishedAllGroups = false
<-groupChan
wg.Add(1)
go func(g *Group) {
earthdiaosi marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
wg.Done()
groupChan <- struct{}{}
}()
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
finishedAllGroups = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker: Non-thread-safe memory accessed by mulitple go routines.

}
return
}
continue
}

if IsIssue347Error(err) {
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil {
finishedAllGroups = false
continue
if IsIssue347Error(err) {
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil {
finishedAllGroups = false
return
}
}
}
return errors.Wrap(err, "compaction")
errChan <- errors.Wrap(err, "compaction")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use errgroup.Group instead?

}(g)
}
wg.Wait()
close(errChan)
if err := <-errChan; err != nil {
return err
}
if finishedAllGroups {
break
Expand Down