Skip to content

Commit

Permalink
Revert "Fix data races, simplify"
Browse files Browse the repository at this point in the history
This reverts commit f61e211.

This actually breaks the semantics of how the state is changed mid-step.
I realize it is a race but it is a race that Packer currently depends
on. I think the proper solution is to fix the state bag.

Sorry, @titanous, I'll address these soon.
  • Loading branch information
mitchellh committed Aug 31, 2013
1 parent e15472e commit da6303b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 58 deletions.
99 changes: 67 additions & 32 deletions basic_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,82 @@ type BasicRunner struct {
// modified.
Steps []Step

cancelDone chan struct{}
runState runState
// l protects runState
l sync.Mutex
cancelCond *sync.Cond
cancelChs []chan<- bool
running bool
l sync.Mutex
}

type runState int

const (
stateInitial = iota
stateRunning
stateCancelling
)

func (b *BasicRunner) Run(state map[string]interface{}) {
// Make sure we only run one at a time
b.l.Lock()
// Make sure we only run one instance at a time
if b.runState != stateInitial {
if b.running {
panic("already running")
}
b.cancelDone = make(chan struct{})
b.runState = stateRunning
b.cancelChs = nil
b.cancelCond = sync.NewCond(&sync.Mutex{})
b.running = true
b.l.Unlock()

// This runs after all of the cleanup steps so that we can notify any
// waiting Cancel callers and transition the state back to initial
// cancelReady is used to signal that the cancellation goroutine
// started and is waiting. The cancelEnded channel is used to
// signal the goroutine actually ended.
cancelReady := make(chan bool, 1)
cancelEnded := make(chan bool)
go func() {
b.cancelCond.L.Lock()
cancelReady <- true
b.cancelCond.Wait()
b.cancelCond.L.Unlock()

if b.cancelChs != nil {
state[StateCancelled] = true
}

cancelEnded <- true
}()

// Create the channel that we'll say we're done on in the case of
// interrupts here. We do this here so that this deferred statement
// runs last, so all the Cleanup methods are able to run.
defer func() {
b.l.Lock()
b.runState = stateInitial
b.l.Unlock()
close(b.cancelDone)
defer b.l.Unlock()

// Make sure the cancellation goroutine cleans up properly. This
// is a bit complicated. Basically, we first wait until the goroutine
// waiting for cancellation is actually waiting. Then we broadcast
// to it so it can unlock. Then we wait for it to tell us it finished.
<-cancelReady
b.cancelCond.L.Lock()
b.cancelCond.Broadcast()
b.cancelCond.L.Unlock()
<-cancelEnded

if b.cancelChs != nil {
for _, doneCh := range b.cancelChs {
doneCh <- true
}
}

b.running = false
}()

for _, step := range b.Steps {
b.l.Lock()
if b.runState != stateRunning {
// We've been cancelled, update the state bag and abort
b.l.Unlock()
// We also check for cancellation here since we can't be sure
// the goroutine that is running to set it actually ran.
if b.cancelChs != nil {
state[StateCancelled] = true
break
}
b.l.Unlock()

action := step.Run(state)
defer step.Cleanup(state)

if _, ok := state[StateCancelled]; ok {
break
}

if action == ActionHalt {
state[StateHalted] = true
break
Expand All @@ -63,15 +93,20 @@ func (b *BasicRunner) Run(state map[string]interface{}) {

func (b *BasicRunner) Cancel() {
b.l.Lock()
// No-op if we're not running
if b.runState == stateInitial {

if !b.running {
b.l.Unlock()
return
}
// Transition state from running to cancelling
b.runState = stateCancelling

if b.cancelChs == nil {
b.cancelChs = make([]chan<- bool, 0, 5)
}

done := make(chan bool)
b.cancelChs = append(b.cancelChs, done)
b.cancelCond.Broadcast()
b.l.Unlock()

// Wait until all of the cleanup hooks have been run
<-b.cancelDone
<-done
}
37 changes: 26 additions & 11 deletions basic_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package multistep
import (
"reflect"
"testing"
"time"
)

func TestBasicRunner_ImplRunner(t *testing.T) {
Expand Down Expand Up @@ -76,18 +77,36 @@ func TestBasicRunner_Run_Halt(t *testing.T) {
}

func TestBasicRunner_Cancel(t *testing.T) {
ch := make(chan chan bool)
data := make(map[string]interface{})
stepA := &TestStepAcc{Data: "a"}
stepB := &TestStepAcc{Data: "b"}
sync := make(chan struct{})
stepInt := &TestStepSync{C: sync}
stepInt := &TestStepSync{ch}
stepC := &TestStepAcc{Data: "c"}

r := &BasicRunner{Steps: []Step{stepA, stepB, stepInt, stepC}}
go r.Run(data)

sync <- struct{}{} // continue stepInt
r.Cancel()
// Wait until we reach the sync point
responseCh := <-ch

// Cancel then continue chain
cancelCh := make(chan bool)
go func() {
r.Cancel()
cancelCh <- true
}()

for {
if _, ok := data[StateCancelled]; ok {
responseCh <- true
break
}

time.Sleep(10 * time.Millisecond)
}

<-cancelCh

// Test run data
expected := []string{"a", "b"}
Expand All @@ -103,13 +122,9 @@ func TestBasicRunner_Cancel(t *testing.T) {
t.Errorf("unexpected result: %#v", results)
}

// Test that the sync cleanup had the cancelled flag
if _, ok := data["sync_cancelled"]; !ok {
t.Errorf("sync cleanup not cancelled")
}

// Test that it says it was cancelled
if _, ok := data[StateCancelled]; !ok {
// Test that it says it is cancelled
cancelled := data[StateCancelled].(bool)
if !cancelled {
t.Errorf("not cancelled")
}
}
25 changes: 10 additions & 15 deletions multistep_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package multistep

import (
"runtime"
)

// A step for testing that accumuluates data into a string slice in the
// the state bag. It always uses the "data" key in the state bag, and will
// initialize it.
Expand All @@ -15,6 +11,11 @@ type TestStepAcc struct {
Halt bool
}

// A step that syncs by sending a channel and expecting a response.
type TestStepSync struct {
Ch chan chan bool
}

func (s TestStepAcc) Run(state map[string]interface{}) StepAction {
s.insertData(state, "data")

Expand All @@ -39,18 +40,12 @@ func (s TestStepAcc) insertData(state map[string]interface{}, key string) {
state[key] = data
}

type TestStepSync struct {
C <-chan struct{}
}

func (s TestStepSync) Run(map[string]interface{}) StepAction {
<-s.C
runtime.Gosched() // ensure that any calls to cancel have a chance to run
ch := make(chan bool)
s.Ch <- ch
<-ch

return ActionContinue
}

func (s TestStepSync) Cleanup(state map[string]interface{}) {
if _, ok := state[StateCancelled]; ok {
state["sync_cancelled"] = true
}
}
func (s TestStepSync) Cleanup(map[string]interface{}) {}

0 comments on commit da6303b

Please sign in to comment.