Skip to content

Commit

Permalink
Remove output channel from migrator
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Sep 3, 2020
1 parent 95e653d commit 2436ac8
Showing 1 changed file with 32 additions and 22 deletions.
54 changes: 32 additions & 22 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package migrator

import (
"sync"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
Expand All @@ -30,12 +32,9 @@ import (
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/atomic"
"go.uber.org/zap"
)

const workerChannelSize = 256

type worker struct {
persistManager persist.Manager
taskOptions migration.TaskOptions
Expand Down Expand Up @@ -132,13 +131,14 @@ func (m *Migrator) Run(ctx context.Context) error {
workers = append(workers, worker)
}

// Start up workers. Intentionally not using sync.WaitGroup so we can know when the last worker
// is finishing so that we can close the output channel.
// Start up workers.
var (
activeWorkers = atomic.NewUint32(uint32(len(workers)))
outputCh = make(chan completedMigration, len(candidates))
wg = &sync.WaitGroup{}
candidatesPerWorker = len(candidates) / numWorkers
candidateIdx = 0

completedMigrationsLock = &sync.Mutex{}
completedMigrations = make([]completedMigration, 0, len(candidates))
)
for i, worker := range workers {
endIdx := candidateIdx + candidatesPerWorker
Expand All @@ -148,19 +148,25 @@ func (m *Migrator) Run(ctx context.Context) error {

worker := worker
startIdx := candidateIdx // Capture current candidateIdx value for goroutine
wg.Add(1)
go func() {
m.startWorker(worker, candidates[startIdx:endIdx], outputCh)
if activeWorkers.Dec() == 0 {
close(outputCh)
}
output := m.startWorker(worker, candidates[startIdx:endIdx])

completedMigrationsLock.Lock()
completedMigrations = append(completedMigrations, output...)
completedMigrationsLock.Unlock()

wg.Done()
}()

candidateIdx = endIdx
}

// Wait until all workers have finished and migration results have been consumed
// Wait until all workers have finished and completedMigrations has been updated
wg.Wait()

migrationResults := make(map[mergeKey]fs.ReadInfoFileResult, len(candidates))
for result := range outputCh {
for _, result := range completedMigrations {
migrationResults[result.key] = result.updatedInfoFileResult
}

Expand Down Expand Up @@ -192,7 +198,8 @@ func (m *Migrator) findMigrationCandidates() []migrationCandidate {
return candidates
}

func (m *Migrator) startWorker(worker *worker, candidates []migrationCandidate, outputCh chan<- completedMigration) {
func (m *Migrator) startWorker(worker *worker, candidates []migrationCandidate) []completedMigration {
output := make([]completedMigration, 0, len(candidates))
for _, candidate := range candidates {
task, err := candidate.newTaskFn(worker.taskOptions.
SetInfoFileResult(candidate.infoFileResult).
Expand All @@ -209,16 +216,19 @@ func (m *Migrator) startWorker(worker *worker, candidates []migrationCandidate,
infoFileResult, err := task.Run()
if err != nil {
m.log.Error("error running migration task", zap.Error(err))
}
outputCh <- completedMigration{
key: mergeKey{
metadata: candidate.metadata,
shard: candidate.shard,
blockStart: candidate.infoFileResult.Info.BlockStart,
},
updatedInfoFileResult: infoFileResult,
} else {
output = append(output, completedMigration{
key: mergeKey{
metadata: candidate.metadata,
shard: candidate.shard,
blockStart: candidate.infoFileResult.Info.BlockStart,
},
updatedInfoFileResult: infoFileResult,
})
}
}

return output
}

// mergeUpdatedInfoFiles takes all ReadInfoFileResults updated by a migration and merges them back
Expand Down

0 comments on commit 2436ac8

Please sign in to comment.