Skip to content

Commit

Permalink
Fix data races, simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
titanous committed Jun 28, 2013
1 parent 9a4404f commit f61e211
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 103 deletions.
99 changes: 32 additions & 67 deletions basic_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,82 +8,52 @@ type BasicRunner struct {
// modified.
Steps []Step

cancelCond *sync.Cond
cancelChs []chan<- bool
running bool
l sync.Mutex
cancelDone chan struct{}
runState runState
// l protects runState
l sync.Mutex
}

type runState int

const (
stateInitial = iota
stateRunning
stateCancelling
)

func (b *BasicRunner) Run(state map[string]interface{}) {
// Make sure we only run one at a time
b.l.Lock()
if b.running {
// Make sure we only run one instance at a time
if b.runState != stateInitial {
panic("already running")
}
b.cancelChs = nil
b.cancelCond = sync.NewCond(&sync.Mutex{})
b.running = true
b.cancelDone = make(chan struct{})
b.runState = stateRunning
b.l.Unlock()

// cancelReady is used to signal that the cancellation goroutine
// started and is waiting. The cancelEnded channel is used to
// signal the goroutine actually ended.
cancelReady := make(chan bool, 1)
cancelEnded := make(chan bool)
go func() {
b.cancelCond.L.Lock()
cancelReady <- true
b.cancelCond.Wait()
b.cancelCond.L.Unlock()

if b.cancelChs != nil {
state[StateCancelled] = true
}

cancelEnded <- true
}()

// Create the channel that we'll say we're done on in the case of
// interrupts here. We do this here so that this deferred statement
// runs last, so all the Cleanup methods are able to run.
// This runs after all of the cleanup steps so that we can notify any
// waiting Cancel callers and transition the state back to initial
defer func() {
b.l.Lock()
defer b.l.Unlock()

// Make sure the cancellation goroutine cleans up properly. This
// is a bit complicated. Basically, we first wait until the goroutine
// waiting for cancellation is actually waiting. Then we broadcast
// to it so it can unlock. Then we wait for it to tell us it finished.
<-cancelReady
b.cancelCond.L.Lock()
b.cancelCond.Broadcast()
b.cancelCond.L.Unlock()
<-cancelEnded

if b.cancelChs != nil {
for _, doneCh := range b.cancelChs {
doneCh <- true
}
}

b.running = false
b.runState = stateInitial
b.l.Unlock()
close(b.cancelDone)
}()

for _, step := range b.Steps {
// We also check for cancellation here since we can't be sure
// the goroutine that is running to set it actually ran.
if b.cancelChs != nil {
b.l.Lock()
if b.runState != stateRunning {
// We've been cancelled, update the state bag and abort
b.l.Unlock()
state[StateCancelled] = true
break
}
b.l.Unlock()

action := step.Run(state)
defer step.Cleanup(state)

if _, ok := state[StateCancelled]; ok {
break
}

if action == ActionHalt {
state[StateHalted] = true
break
Expand All @@ -93,20 +63,15 @@ func (b *BasicRunner) Run(state map[string]interface{}) {

func (b *BasicRunner) Cancel() {
b.l.Lock()

if !b.running {
// No-op if we're not running
if b.runState == stateInitial {
b.l.Unlock()
return
}

if b.cancelChs == nil {
b.cancelChs = make([]chan<- bool, 0, 5)
}

done := make(chan bool)
b.cancelChs = append(b.cancelChs, done)
b.cancelCond.Broadcast()
// Transition state from running to cancelling
b.runState = stateCancelling
b.l.Unlock()

<-done
// Wait until all of the cleanup hooks have been run
<-b.cancelDone
}
37 changes: 11 additions & 26 deletions basic_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package multistep
import (
"reflect"
"testing"
"time"
)

func TestBasicRunner_ImplRunner(t *testing.T) {
Expand Down Expand Up @@ -77,36 +76,18 @@ func TestBasicRunner_Run_Halt(t *testing.T) {
}

func TestBasicRunner_Cancel(t *testing.T) {
ch := make(chan chan bool)
data := make(map[string]interface{})
stepA := &TestStepAcc{Data: "a"}
stepB := &TestStepAcc{Data: "b"}
stepInt := &TestStepSync{ch}
sync := make(chan struct{})
stepInt := &TestStepSync{C: sync}
stepC := &TestStepAcc{Data: "c"}

r := &BasicRunner{Steps: []Step{stepA, stepB, stepInt, stepC}}
go r.Run(data)

// Wait until we reach the sync point
responseCh := <-ch

// Cancel then continue chain
cancelCh := make(chan bool)
go func() {
r.Cancel()
cancelCh <- true
}()

for {
if _, ok := data[StateCancelled]; ok {
responseCh <- true
break
}

time.Sleep(10 * time.Millisecond)
}

<-cancelCh
sync <- struct{}{} // continue stepInt
r.Cancel()

// Test run data
expected := []string{"a", "b"}
Expand All @@ -122,9 +103,13 @@ func TestBasicRunner_Cancel(t *testing.T) {
t.Errorf("unexpected result: %#v", results)
}

// Test that it says it is cancelled
cancelled := data[StateCancelled].(bool)
if !cancelled {
// Test that the sync cleanup had the cancelled flag
if _, ok := data["sync_cancelled"]; !ok {
t.Errorf("sync cleanup not cancelled")
}

// Test that it says it was cancelled
if _, ok := data[StateCancelled]; !ok {
t.Errorf("not cancelled")
}
}
25 changes: 15 additions & 10 deletions multistep_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package multistep

import (
"runtime"
)

// A step for testing that accumuluates data into a string slice in the
// the state bag. It always uses the "data" key in the state bag, and will
// initialize it.
Expand All @@ -11,11 +15,6 @@ type TestStepAcc struct {
Halt bool
}

// A step that syncs by sending a channel and expecting a response.
type TestStepSync struct {
Ch chan chan bool
}

func (s TestStepAcc) Run(state map[string]interface{}) StepAction {
s.insertData(state, "data")

Expand All @@ -40,12 +39,18 @@ func (s TestStepAcc) insertData(state map[string]interface{}, key string) {
state[key] = data
}

func (s TestStepSync) Run(map[string]interface{}) StepAction {
ch := make(chan bool)
s.Ch <- ch
<-ch
type TestStepSync struct {
C <-chan struct{}
}

func (s TestStepSync) Run(map[string]interface{}) StepAction {
<-s.C
runtime.Gosched() // ensure that any calls to cancel have a chance to run
return ActionContinue
}

func (s TestStepSync) Cleanup(map[string]interface{}) {}
func (s TestStepSync) Cleanup(state map[string]interface{}) {
if _, ok := state[StateCancelled]; ok {
state["sync_cancelled"] = true
}
}

0 comments on commit f61e211

Please sign in to comment.