-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compact: add concurrency to group compact #898
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -95,6 +95,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri | |||||
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). | ||||||
Default("20").Int() | ||||||
|
||||||
groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group."). | ||||||
Default("1").Int() | ||||||
|
||||||
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { | ||||||
return runCompact(g, logger, reg, | ||||||
*httpAddr, | ||||||
|
@@ -112,6 +115,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri | |||||
*disableDownsampling, | ||||||
*maxCompactionLevel, | ||||||
*blockSyncConcurrency, | ||||||
*groupCompactConcurrency, | ||||||
) | ||||||
} | ||||||
} | ||||||
|
@@ -131,6 +135,7 @@ func runCompact( | |||||
disableDownsampling bool, | ||||||
maxCompactionLevel int, | ||||||
blockSyncConcurrency int, | ||||||
groupCompactConcurrency int, | ||||||
) error { | ||||||
halted := prometheus.NewGauge(prometheus.GaugeOpts{ | ||||||
Name: "thanos_compactor_halted", | ||||||
|
@@ -206,7 +211,7 @@ func runCompact( | |||||
|
||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||
f := func() error { | ||||||
if err := compactor.Compact(ctx); err != nil { | ||||||
if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
return errors.Wrap(err, "compaction failed") | ||||||
} | ||||||
level.Info(logger).Log("msg", "compaction iterations done") | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -886,7 +886,7 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp | |||||
} | ||||||
|
||||||
// Compact runs compaction over bucket. | ||||||
func (c *BucketCompactor) Compact(ctx context.Context) error { | ||||||
func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think just
Suggested change
|
||||||
// Loop over bucket and compact until there's no work left. | ||||||
for { | ||||||
// Clean up the compaction temporary directory at the beginning of every compaction loop. | ||||||
|
@@ -913,22 +913,42 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { | |||||
return errors.Wrap(err, "build compaction groups") | ||||||
} | ||||||
finishedAllGroups := true | ||||||
var wg sync.WaitGroup | ||||||
errChan := make(chan error, len(groups)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need something more similiar to this https://github.com/improbable-eng/thanos/pull/887/files still.
|
||||||
groupChan := make(chan struct{}, groupCompactConcurrency) | ||||||
defer close(groupChan) | ||||||
for i := 0; i < groupCompactConcurrency; i++ { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am reviewing on mobile, so might miss something, but I don't get this loop. What is the purpose of it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I think I get it, but seem my comment on line 917 |
||||||
groupChan <- struct{}{} | ||||||
} | ||||||
for _, g := range groups { | ||||||
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) | ||||||
if err == nil { | ||||||
if shouldRerunGroup { | ||||||
finishedAllGroups = false | ||||||
<-groupChan | ||||||
wg.Add(1) | ||||||
go func(g *Group) { | ||||||
earthdiaosi marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
defer func() { | ||||||
wg.Done() | ||||||
groupChan <- struct{}{} | ||||||
}() | ||||||
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) | ||||||
if err == nil { | ||||||
if shouldRerunGroup { | ||||||
finishedAllGroups = false | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blocker: Non-thread-safe memory accessed by mulitple go routines. |
||||||
} | ||||||
return | ||||||
} | ||||||
continue | ||||||
} | ||||||
|
||||||
if IsIssue347Error(err) { | ||||||
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { | ||||||
finishedAllGroups = false | ||||||
continue | ||||||
if IsIssue347Error(err) { | ||||||
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { | ||||||
finishedAllGroups = false | ||||||
return | ||||||
} | ||||||
} | ||||||
} | ||||||
return errors.Wrap(err, "compaction") | ||||||
errChan <- errors.Wrap(err, "compaction") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use |
||||||
}(g) | ||||||
} | ||||||
wg.Wait() | ||||||
close(errChan) | ||||||
if err := <-errChan; err != nil { | ||||||
return err | ||||||
} | ||||||
if finishedAllGroups { | ||||||
break | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.