Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: add concurrency to group compact #898

Closed
wants to merge 4 commits into from

Conversation

earthdiaosi
Copy link

Changes

Allow for compacting of group in parallel.

Verification

Tested by running thanos compact to see that compacting much quicker when concurrency is added.

@earthdiaosi
Copy link
Author

@bwplotka please review

Copy link
Member

@GiedriusS GiedriusS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM my comments that I've deleted - I missed some sneaky braces. LGTM from a purely mechanical side.

@earthdiaosi earthdiaosi force-pushed the master branch 2 times, most recently from afca3ac to de76399 Compare March 14, 2019 10:18
@earthdiaosi
Copy link
Author

@GiedriusS i have fixed the conflicts, please review

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we need something more complex than this.

  1. We need to ensure directory is different (that might be the case already)
  2. We have to limit the number of concurrent go routines. Essentially the main problem with compactor is not that it's too slow for typical cases. It's that it uses too much memory. We need to optimize the compaction process for it at some point, but essentially even single run for bigger project can take 30-50GB of RAM or sometimes more if people have milions of series.

This change will hit those users as now instead of "just" 60GB they will need 60GB * number of individual groups they have. For example for us it 30 different ones. So 60 * 30 GB suddenly with this PR merged. It's quite memory intensive part.

So I would suggest having something like this but have configurable number of workers to do it concurrently if user wish so and he has enough memory for their blocks. What do you think? (:

pkg/compact/compact.go Show resolved Hide resolved
@earthdiaosi
Copy link
Author

@bwplotka @GiedriusS Please review again

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some question.

Also what do you think about moving to errgroup.Group instead?

errChan := make(chan error, len(groups))
groupChan := make(chan struct{}, groupCompactConcurrency)
defer close(groupChan)
for i := 0; i < groupCompactConcurrency; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am reviewing on mobile, so might miss something, but I don't get this loop. What is the purpose of it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I get it, but seem my comment on line 917

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for update! but I think there is still some work to be done.. @earthdiaosi let me know what do you think!

@@ -886,7 +886,7 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp
}

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) error {
func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just concurrency would be enough.

Suggested change
func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error {
func (c *BucketCompactor) Compact(ctx context.Context, concurrency int) error {

}
}
return errors.Wrap(err, "compaction")
errChan <- errors.Wrap(err, "compaction")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use errgroup.Group instead?

@@ -95,6 +95,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group.").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group.").
groupCompactConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting group.").

@@ -206,7 +211,7 @@ func runCompact(

ctx, cancel := context.WithCancel(context.Background())
f := func() error {
if err := compactor.Compact(ctx); err != nil {
if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil {
if err := compactor.Compact(ctx, concurrency); err != nil {

errChan := make(chan error, len(groups))
groupChan := make(chan struct{}, groupCompactConcurrency)
defer close(groupChan)
for i := 0; i < groupCompactConcurrency; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I get it, but seem my comment on line 917

shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
finishedAllGroups = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker: Non-thread-safe memory accessed by mulitple go routines.

@@ -913,22 +913,42 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
return errors.Wrap(err, "build compaction groups")
}
finishedAllGroups := true
var wg sync.WaitGroup
errChan := make(chan error, len(groups))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need something more similiar to this https://github.com/improbable-eng/thanos/pull/887/files still.
This is for consistency and readability reasons. so:

  1. Run concurrency number of go routines BEFORE for{ loop (891 line)
  2. Do the whole sync Iterate over groups
   for _, g := range groups {
````. 
3. Send `g` via some channel with 0 queue.
4. In go routines do the g (`g.Compact`) and mark `finishedAllGroups` to false if there are still some group to run. Make sure to make it `atomic.Value` or use `sync.Mutex`. 
5. After ` for _, g := range groups {` close channel and wait for all groups to finish. If `finishedAllGroups == true` return otherwise continue

@mjd95 mjd95 mentioned this pull request Apr 4, 2019
@mjd95
Copy link
Contributor

mjd95 commented Apr 4, 2019

@earthdiaosi thanks for raising this PR! I think it's a good optional addition, and can be a very helpful optimisation in some cases.

We are working through a large backlog of data and this PR is very helpful for us. Since we need to use it as soon as possible, I've addressed @bwplotka's comments and raised another PR (#1010) based on top of yours addressing them. I hope that's okay!

@bwplotka
Copy link
Member

bwplotka commented Apr 4, 2019

Closing to avoid confusion, hope it's ok with you @earthdiaosi ! Let us know if not, good work with starting with this!

@bwplotka bwplotka closed this Apr 4, 2019
@earthdiaosi
Copy link
Author

@mjd95 @bwplotka it's ok with me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants