Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data races, simplify #2

Merged
merged 1 commit into from
Aug 21, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 32 additions & 67 deletions basic_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,82 +8,52 @@ type BasicRunner struct {
// modified.
Steps []Step

cancelCond *sync.Cond
cancelChs []chan<- bool
running bool
l sync.Mutex
cancelDone chan struct{}
runState runState
// l protects runState
l sync.Mutex
}

type runState int

const (
stateInitial = iota
stateRunning
stateCancelling
)

func (b *BasicRunner) Run(state map[string]interface{}) {
// Make sure we only run one at a time
b.l.Lock()
if b.running {
// Make sure we only run one instance at a time
if b.runState != stateInitial {
panic("already running")
}
b.cancelChs = nil
b.cancelCond = sync.NewCond(&sync.Mutex{})
b.running = true
b.cancelDone = make(chan struct{})
b.runState = stateRunning
b.l.Unlock()

// cancelReady is used to signal that the cancellation goroutine
// started and is waiting. The cancelEnded channel is used to
// signal the goroutine actually ended.
cancelReady := make(chan bool, 1)
cancelEnded := make(chan bool)
go func() {
b.cancelCond.L.Lock()
cancelReady <- true
b.cancelCond.Wait()
b.cancelCond.L.Unlock()

if b.cancelChs != nil {
state[StateCancelled] = true
}

cancelEnded <- true
}()

// Create the channel that we'll say we're done on in the case of
// interrupts here. We do this here so that this deferred statement
// runs last, so all the Cleanup methods are able to run.
// This runs after all of the cleanup steps so that we can notify any
// waiting Cancel callers and transition the state back to initial
defer func() {
b.l.Lock()
defer b.l.Unlock()

// Make sure the cancellation goroutine cleans up properly. This
// is a bit complicated. Basically, we first wait until the goroutine
// waiting for cancellation is actually waiting. Then we broadcast
// to it so it can unlock. Then we wait for it to tell us it finished.
<-cancelReady
b.cancelCond.L.Lock()
b.cancelCond.Broadcast()
b.cancelCond.L.Unlock()
<-cancelEnded

if b.cancelChs != nil {
for _, doneCh := range b.cancelChs {
doneCh <- true
}
}

b.running = false
b.runState = stateInitial
b.l.Unlock()
close(b.cancelDone)
}()

for _, step := range b.Steps {
// We also check for cancellation here since we can't be sure
// the goroutine that is running to set it actually ran.
if b.cancelChs != nil {
b.l.Lock()
if b.runState != stateRunning {
// We've been cancelled, update the state bag and abort
b.l.Unlock()
state[StateCancelled] = true
break
}
b.l.Unlock()

action := step.Run(state)
defer step.Cleanup(state)

if _, ok := state[StateCancelled]; ok {
break
}

if action == ActionHalt {
state[StateHalted] = true
break
Expand All @@ -93,20 +63,15 @@ func (b *BasicRunner) Run(state map[string]interface{}) {

func (b *BasicRunner) Cancel() {
b.l.Lock()

if !b.running {
// No-op if we're not running
if b.runState == stateInitial {
b.l.Unlock()
return
}

if b.cancelChs == nil {
b.cancelChs = make([]chan<- bool, 0, 5)
}

done := make(chan bool)
b.cancelChs = append(b.cancelChs, done)
b.cancelCond.Broadcast()
// Transition state from running to cancelling
b.runState = stateCancelling
b.l.Unlock()

<-done
// Wait until all of the cleanup hooks have been run
<-b.cancelDone
}
37 changes: 11 additions & 26 deletions basic_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package multistep
import (
"reflect"
"testing"
"time"
)

func TestBasicRunner_ImplRunner(t *testing.T) {
Expand Down Expand Up @@ -77,36 +76,18 @@ func TestBasicRunner_Run_Halt(t *testing.T) {
}

func TestBasicRunner_Cancel(t *testing.T) {
ch := make(chan chan bool)
data := make(map[string]interface{})
stepA := &TestStepAcc{Data: "a"}
stepB := &TestStepAcc{Data: "b"}
stepInt := &TestStepSync{ch}
sync := make(chan struct{})
stepInt := &TestStepSync{C: sync}
stepC := &TestStepAcc{Data: "c"}

r := &BasicRunner{Steps: []Step{stepA, stepB, stepInt, stepC}}
go r.Run(data)

// Wait until we reach the sync point
responseCh := <-ch

// Cancel then continue chain
cancelCh := make(chan bool)
go func() {
r.Cancel()
cancelCh <- true
}()

for {
if _, ok := data[StateCancelled]; ok {
responseCh <- true
break
}

time.Sleep(10 * time.Millisecond)
}

<-cancelCh
sync <- struct{}{} // continue stepInt
r.Cancel()

// Test run data
expected := []string{"a", "b"}
Expand All @@ -122,9 +103,13 @@ func TestBasicRunner_Cancel(t *testing.T) {
t.Errorf("unexpected result: %#v", results)
}

// Test that it says it is cancelled
cancelled := data[StateCancelled].(bool)
if !cancelled {
// Test that the sync cleanup had the cancelled flag
if _, ok := data["sync_cancelled"]; !ok {
t.Errorf("sync cleanup not cancelled")
}

// Test that it says it was cancelled
if _, ok := data[StateCancelled]; !ok {
t.Errorf("not cancelled")
}
}
25 changes: 15 additions & 10 deletions multistep_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package multistep

import (
"runtime"
)

// A step for testing that accumuluates data into a string slice in the
// the state bag. It always uses the "data" key in the state bag, and will
// initialize it.
Expand All @@ -11,11 +15,6 @@ type TestStepAcc struct {
Halt bool
}

// A step that syncs by sending a channel and expecting a response.
type TestStepSync struct {
Ch chan chan bool
}

func (s TestStepAcc) Run(state map[string]interface{}) StepAction {
s.insertData(state, "data")

Expand All @@ -40,12 +39,18 @@ func (s TestStepAcc) insertData(state map[string]interface{}, key string) {
state[key] = data
}

func (s TestStepSync) Run(map[string]interface{}) StepAction {
ch := make(chan bool)
s.Ch <- ch
<-ch
type TestStepSync struct {
C <-chan struct{}
}

func (s TestStepSync) Run(map[string]interface{}) StepAction {
<-s.C
runtime.Gosched() // ensure that any calls to cancel have a chance to run
return ActionContinue
}

func (s TestStepSync) Cleanup(map[string]interface{}) {}
func (s TestStepSync) Cleanup(state map[string]interface{}) {
if _, ok := state[StateCancelled]; ok {
state["sync_cancelled"] = true
}
}