Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log: Clean-up operation loop #2294

Merged
merged 9 commits into from
Feb 1, 2021
229 changes: 101 additions & 128 deletions log/operation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/golang/glog"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/google/trillian/storage"
"github.com/google/trillian/util/clock"
"github.com/google/trillian/util/election"
"golang.org/x/sync/semaphore"
)

var (
Expand Down Expand Up @@ -85,7 +85,7 @@ type OperationInfo struct {

// The following parameters are passed to individual Operations.

// BatchSize is the processing batch size to be passed to tasks run by this manager
// BatchSize is the batch size to be passed to tasks run by this manager.
BatchSize int
// TimeSource should be used by the Operation to allow mocking for tests.
TimeSource clock.TimeSource
Expand All @@ -111,7 +111,7 @@ type OperationInfo struct {
type OperationManager struct {
info OperationInfo

// logOperation is the task that gets run across active logs in the scheduling loop
// logOperation is the task that gets run for active logs.
logOperation Operation

// runnerWG groups all goroutines with election Runners.
Expand Down Expand Up @@ -139,29 +139,34 @@ func NewOperationManager(info OperationInfo, logOperation Operation) *OperationM
if info.Timeout == 0 {
info.Timeout = DefaultTimeout
}
tracker := election.NewMasterTracker(nil, func(id string, v bool) {
val := 0.0
if v {
val = 1.0
}
isMaster.Set(val, id)
})
return &OperationManager{
info: info,
logOperation: logOperation,
runnerCancels: make(map[string]context.CancelFunc),
pendingResignations: make(chan election.Resignation, 100),
tracker: tracker,
logNames: make(map[int64]string),
}
}

// getActiveLogIDs returns IDs of all currently active logs, regardless of
// mastership status.
// getActiveLogIDs returns IDs of logs eligible for sequencing.
func (o *OperationManager) getActiveLogIDs(ctx context.Context) ([]int64, error) {
tx, err := o.info.Registry.LogStorage.Snapshot(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create transaction: %v", err)
}
defer tx.Close()

logIDs, err := tx.GetActiveLogIDs(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get active logIDs: %v", err)
}

if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("failed to commit: %v", err)
}
Expand Down Expand Up @@ -217,21 +222,12 @@ func (o *OperationManager) masterFor(ctx context.Context, allIDs []int64) ([]int
s := strconv.FormatInt(id, 10)
allStringIDs = append(allStringIDs, s)
}
if o.tracker == nil {
glog.Infof("creating mastership tracker for %v", allIDs)
o.tracker = election.NewMasterTracker(allStringIDs, func(id string, v bool) {
val := 0.0
if v {
val = 1.0
}
isMaster.Set(val, id)
})
}

// Synchronize the set of log IDs with those we are tracking mastership for.
for _, logID := range allStringIDs {
knownLogs.Set(1, logID)
if o.runnerCancels[logID] == nil {
o.tracker.Set(logID, false) // Initialise tracking for this ID.
o.runnerCancels[logID] = o.runElectionWithRestarts(ctx, logID)
}
}
Expand Down Expand Up @@ -329,19 +325,14 @@ func (o *OperationManager) getLogsAndExecutePass(ctx context.Context) error {
}
o.updateHeldIDs(ctx, logIDs, activeIDs)

// TODO(pavelkalinnikov): Run executor once instead of doing it on each pass.
// This will be also needed when factoring out per-log operation loop.
ex := newExecutor(o.logOperation, &o.info, len(logIDs))
// Put logIDs that need to be processed to the executor's channel.
for _, logID := range logIDs {
ex.jobs <- logID
}
close(ex.jobs) // Cause executor's run to terminate when it has drained the jobs.
ex.run(runCtx)
executePassForAll(runCtx, &o.info, o.logOperation, logIDs)
return nil
}

// OperationSingle performs a single pass of the manager.
//
// TODO(pavelkalinnikov): Deprecate this because it doesn't clean up any state,
// and is used only for testing.
func (o *OperationManager) OperationSingle(ctx context.Context) {
if err := o.getLogsAndExecutePass(ctx); err != nil {
glog.Errorf("failed to perform operation: %v", err)
Expand All @@ -353,52 +344,12 @@ func (o *OperationManager) OperationSingle(ctx context.Context) {
func (o *OperationManager) OperationLoop(ctx context.Context) {
glog.Infof("Log operation manager starting")

// Outer loop, runs until terminated
loop:
// Outer loop, runs until terminated.
for {
// TODO(alcutter): want a child context with deadline here?
start := o.info.TimeSource.Now()
if err := o.getLogsAndExecutePass(ctx); err != nil {
// Suppress the error if ctx is done (ctx.Err != nil) as we're exiting.
if ctx.Err() != nil {
glog.Errorf("failed to execute operation on logs: %v", err)
}
}
glog.V(1).Infof("Log operation manager pass complete")

// Process any pending resignations while there's no activity.
doneResigning := false
for !doneResigning {
select {
case r := <-o.pendingResignations:
resignations.Inc(r.ID)
r.Execute(ctx)
default:
doneResigning = true
}
}

// See if it's time to quit
select {
case <-ctx.Done():
if err := o.operateOnce(ctx); err != nil {
glog.Infof("Log operation manager shutting down")
break loop
default:
break
}

// Wait for the configured time before going for another pass
duration := o.info.TimeSource.Now().Sub(start)
wait := o.info.RunInterval - duration
if wait > 0 {
glog.V(1).Infof("Processing started at %v for %v; wait %v before next run", start, duration, wait)
if err := clock.SleepContext(ctx, wait); err != nil {
glog.Infof("Log operation manager shutting down")
break loop
}
} else {
glog.V(1).Infof("Processing started at %v for %v; start next run immediately", start, duration)
}

}

// Terminate all the election Runners.
Expand All @@ -421,84 +372,106 @@ loop:
glog.Infof("wait for termination of election runners...done")
}

// logOperationExecutor runs the specified Operation on the submitted logs
// in a set of parallel workers.
type logOperationExecutor struct {
op Operation
info *OperationInfo

// jobs holds logIDs to run log operation on.
// TODO(pavelkalinnikov): Use mastership context for each job to make them
// auto-cancelable when mastership is lost.
// TODO(pavelkalinnikov): Report job completion status back.
jobs chan int64
}
// operateOnce runs a single round of operation for each of the active logs
// that this instance is master for. Returns an error only if the context is
// canceled, i.e. the operation is being shut down.
func (o *OperationManager) operateOnce(ctx context.Context) error {
// TODO(alcutter): want a child context with deadline here?
start := o.info.TimeSource.Now()
if err := o.getLogsAndExecutePass(ctx); err != nil {
// Suppress the error if ctx is done (ctx.Err != nil) as we're exiting.
if ctx.Err() != nil {
glog.Errorf("failed to execute operation on logs: %v", err)
}
}
glog.V(1).Infof("Log operation manager pass complete")

func newExecutor(op Operation, info *OperationInfo, jobs int) *logOperationExecutor {
if jobs < 0 {
jobs = 0
// Process any pending resignations while there's no activity.
doneResigning := false
for !doneResigning {
select {
case r := <-o.pendingResignations:
resignations.Inc(r.ID)
r.Execute(ctx)
default:
doneResigning = true
}
}

// See if it's time to quit.
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Wait for the configured time before going for another pass.
duration := o.info.TimeSource.Now().Sub(start)
wait := o.info.RunInterval - duration
if wait > 0 {
glog.V(1).Infof("Processing started at %v for %v; wait %v before next run", start, duration, wait)
if err := clock.SleepContext(ctx, wait); err != nil {
return err
}
} else {
glog.V(1).Infof("Processing started at %v for %v; start next run immediately", start, duration)
}
return &logOperationExecutor{op: op, info: info, jobs: make(chan int64, jobs)}
return nil
}

// run sets off a collection of transient worker goroutines which process the
// pending log operation jobs until the jobs channel is closed.
func (e *logOperationExecutor) run(ctx context.Context) {
startBatch := e.info.TimeSource.Now()
// executePassForAll runs ExecutePass of the given operation for each of the
// passed-in logs, allowing up to a configurable number of parallel operations.
func executePassForAll(ctx context.Context, info *OperationInfo, op Operation, logIDs []int64) {
startBatch := info.TimeSource.Now()

numWorkers := e.info.NumWorkers
numWorkers := info.NumWorkers
if numWorkers <= 0 {
glog.Warning("Running executor with NumWorkers <= 0, assuming 1")
numWorkers = 1
}
glog.V(1).Infof("Running executor with %d worker(s)", numWorkers)

sem := semaphore.NewWeighted(int64(numWorkers))
var wg sync.WaitGroup
var successCount, failCount, itemCount int64

for i := 0; i < numWorkers; i++ {
for _, logID := range logIDs {
if err := sem.Acquire(ctx, 1); err != nil {
break // Terminate because the context is canceled.
}
wg.Add(1)
go func() {
go func(logID int64) {
defer wg.Done()
for {
logID, ok := <-e.jobs
if !ok {
return
}

label := strconv.FormatInt(logID, 10)
start := e.info.TimeSource.Now()
count, err := e.op.ExecutePass(ctx, logID, e.info)
if err != nil {
glog.Errorf("ExecutePass(%v) failed: %v", logID, err)
failedSigningRuns.Inc(label)
atomic.AddInt64(&failCount, 1)
continue
}

// This indicates signing activity is proceeding on the logID.
signingRuns.Inc(label)
if count > 0 {
d := clock.SecondsSince(e.info.TimeSource, start)
glog.Infof("%v: processed %d items in %.2f seconds (%.2f qps)", logID, count, d, float64(count)/d)
entriesAdded.Add(float64(count), label)
batchesAdded.Inc(label)
} else {
glog.V(1).Infof("%v: no items to process", logID)
}

atomic.AddInt64(&successCount, 1)
atomic.AddInt64(&itemCount, int64(count))
defer sem.Release(1)
if err := executePass(ctx, info, op, logID); err != nil {
glog.Errorf("ExecutePass(%v) failed: %v", logID, err)
}
}()
}(logID)
}

// Wait for the workers to consume all of the logIDs.
wg.Wait()
d := clock.SecondsSince(e.info.TimeSource, startBatch)
if itemCount > 0 {
glog.Infof("Group run completed in %.2f seconds: %v succeeded, %v failed, %v items processed", d, successCount, failCount, itemCount)
d := clock.SecondsSince(info.TimeSource, startBatch)
glog.V(1).Infof("Group run completed in %.2f seconds", d)
}

// executePass runs ExecutePass of the given operation for the passed-in log.
func executePass(ctx context.Context, info *OperationInfo, op Operation, logID int64) error {
label := strconv.FormatInt(logID, 10)
start := info.TimeSource.Now()
count, err := op.ExecutePass(ctx, logID, info)
if err != nil {
failedSigningRuns.Inc(label)
return err
}

// This indicates signing activity is proceeding on the logID.
signingRuns.Inc(label)
if count > 0 {
d := clock.SecondsSince(info.TimeSource, start)
glog.Infof("%v: processed %d items in %.2f seconds (%.2f qps)", logID, count, d, float64(count)/d)
entriesAdded.Add(float64(count), label)
batchesAdded.Inc(label)
} else {
glog.V(1).Infof("Group run completed in %.2f seconds: no items to process", d)
glog.V(1).Infof("%v: no items to process", logID)
}
return nil
}