Skip to content

Commit

Permalink
Avoid "break label" by factoring out a method
Browse files Browse the repository at this point in the history
  • Loading branch information
pav-kv committed Jan 28, 2021
1 parent 5324173 commit 4b5b25f
Showing 1 changed file with 52 additions and 42 deletions.
94 changes: 52 additions & 42 deletions log/operation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ func (o *OperationManager) getLogsAndExecutePass(ctx context.Context) error {
}

// OperationSingle performs a single pass of the manager.
//
// TODO(pavelkalinnikov): Deprecate this because it doesn't clean up any state,
// and is used only for testing.
func (o *OperationManager) OperationSingle(ctx context.Context) {
if err := o.getLogsAndExecutePass(ctx); err != nil {
glog.Errorf("failed to perform operation: %v", err)
Expand All @@ -342,51 +345,11 @@ func (o *OperationManager) OperationLoop(ctx context.Context) {
glog.Infof("Log operation manager starting")

// Outer loop, runs until terminated.
loop:
for {
// TODO(alcutter): want a child context with deadline here?
start := o.info.TimeSource.Now()
if err := o.getLogsAndExecutePass(ctx); err != nil {
// Suppress the error if ctx is done (ctx.Err != nil) as we're exiting.
if ctx.Err() != nil {
glog.Errorf("failed to execute operation on logs: %v", err)
}
}
glog.V(1).Infof("Log operation manager pass complete")

// Process any pending resignations while there's no activity.
doneResigning := false
for !doneResigning {
select {
case r := <-o.pendingResignations:
resignations.Inc(r.ID)
r.Execute(ctx)
default:
doneResigning = true
}
}

// See if it's time to quit.
select {
case <-ctx.Done():
if err := o.operateOnce(ctx); err != nil {
glog.Infof("Log operation manager shutting down")
break loop
default:
break
}

// Wait for the configured time before going for another pass.
duration := o.info.TimeSource.Now().Sub(start)
wait := o.info.RunInterval - duration
if wait > 0 {
glog.V(1).Infof("Processing started at %v for %v; wait %v before next run", start, duration, wait)
if err := clock.SleepContext(ctx, wait); err != nil {
glog.Infof("Log operation manager shutting down")
break loop
}
} else {
glog.V(1).Infof("Processing started at %v for %v; start next run immediately", start, duration)
}

}

// Terminate all the election Runners.
Expand All @@ -409,6 +372,53 @@ loop:
glog.Infof("wait for termination of election runners...done")
}

// operateOnce runs a single round of operation for each of the active logs
// that this instance is master for. Returns an error only if the context is
// canceled, i.e. the operation is being shut down.
func (o *OperationManager) operateOnce(ctx context.Context) error {
// TODO(alcutter): want a child context with deadline here?
start := o.info.TimeSource.Now()
if err := o.getLogsAndExecutePass(ctx); err != nil {
// Suppress the error if ctx is done (ctx.Err != nil) as we're exiting.
if ctx.Err() != nil {
glog.Errorf("failed to execute operation on logs: %v", err)
}
}
glog.V(1).Infof("Log operation manager pass complete")

// Process any pending resignations while there's no activity.
doneResigning := false
for !doneResigning {
select {
case r := <-o.pendingResignations:
resignations.Inc(r.ID)
r.Execute(ctx)
default:
doneResigning = true
}
}

// See if it's time to quit.
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Wait for the configured time before going for another pass.
duration := o.info.TimeSource.Now().Sub(start)
wait := o.info.RunInterval - duration
if wait > 0 {
glog.V(1).Infof("Processing started at %v for %v; wait %v before next run", start, duration, wait)
if err := clock.SleepContext(ctx, wait); err != nil {
return err
}
} else {
glog.V(1).Infof("Processing started at %v for %v; start next run immediately", start, duration)
}
return nil
}

// executePassForAll runs ExecutePass of the given operation for each of the
// passed-in logs, allowing up to a configurable number of parallel operations.
func executePassForAll(ctx context.Context, info *OperationInfo, op Operation, logIDs []int64) {
Expand Down

0 comments on commit 4b5b25f

Please sign in to comment.