Skip to content

Commit

Permalink
Merge pull request hashicorp#1592 from hashicorp/b-reap
Browse files Browse the repository at this point in the history
Adds support for the reap lock.
  • Loading branch information
slackpad committed Jan 13, 2016
2 parents 336d5b6 + 343838f commit 71e3901
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 5 deletions.
9 changes: 9 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ type Agent struct {
// agent methods use this, so use with care and never override
// outside of a unit test.
endpoints map[string]string

// reapLock is used to prevent child process reaping from interfering
// with normal waiting for subprocesses to complete. Any time you exec
// and wait, you should take a read lock on this mutex. Only the reaper
// takes the write lock. This setup prevents us from serializing all the
// child process management with each other, it just serializes them
// with the child process reaper.
reapLock sync.RWMutex
}

// Create is used to create a new Agent. Returns
Expand Down Expand Up @@ -949,6 +957,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
Script: chkType.Script,
Interval: chkType.Interval,
Logger: a.logger,
ReapLock: &a.reapLock,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
Expand Down
7 changes: 7 additions & 0 deletions command/agent/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type CheckMonitor struct {
Script string
Interval time.Duration
Logger *log.Logger
ReapLock *sync.RWMutex

stop bool
stopCh chan struct{}
Expand Down Expand Up @@ -146,6 +147,12 @@ func (c *CheckMonitor) run() {

// check is invoked periodically to perform the script check
func (c *CheckMonitor) check() {
// Disable child process reaping so that we can get this command's
// return value. Note that we take the read lock here since we are
// waiting on a specific PID and don't need to serialize all waits.
c.ReapLock.RLock()
defer c.ReapLock.RUnlock()

// Create the command
cmd, err := ExecScript(c.Script)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions command/agent/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http/httptest"
"os"
"os/exec"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -42,6 +43,7 @@ func expectStatus(t *testing.T, script, status string) {
Script: script,
Interval: 10 * time.Millisecond,
Logger: log.New(os.Stderr, "", log.LstdFlags),
ReapLock: &sync.RWMutex{},
}
check.Start()
defer check.Stop()
Expand Down Expand Up @@ -90,6 +92,7 @@ func TestCheckMonitor_RandomStagger(t *testing.T) {
Script: "exit 0",
Interval: 25 * time.Millisecond,
Logger: log.New(os.Stderr, "", log.LstdFlags),
ReapLock: &sync.RWMutex{},
}
check.Start()
defer check.Stop()
Expand Down Expand Up @@ -118,6 +121,7 @@ func TestCheckMonitor_LimitOutput(t *testing.T) {
Script: "od -N 81920 /dev/urandom",
Interval: 25 * time.Millisecond,
Logger: log.New(os.Stderr, "", log.LstdFlags),
ReapLock: &sync.RWMutex{},
}
check.Start()
defer check.Stop()
Expand Down
6 changes: 3 additions & 3 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func (c *Command) Run(args []string) int {
}
}
}()
go reap.ReapChildren(pids, errors, c.agent.shutdownCh)
go reap.ReapChildren(pids, errors, c.agent.shutdownCh, &c.agent.reapLock)
}
}

Expand Down Expand Up @@ -709,7 +709,7 @@ func (c *Command) Run(args []string) int {
// Register the watches
for _, wp := range config.WatchPlans {
go func(wp *watch.WatchPlan) {
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"], &c.agent.reapLock)
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr.String()); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
Expand Down Expand Up @@ -896,7 +896,7 @@ func (c *Command) handleReload(config *Config) *Config {
// Register the new watches
for _, wp := range newConf.WatchPlans {
go func(wp *watch.WatchPlan) {
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"], &c.agent.reapLock)
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr.String()); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
Expand Down
6 changes: 6 additions & 0 deletions command/agent/remote_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) {
return
}

// Disable child process reaping so that we can get this command's
// return value. Note that we take the read lock here since we are
// waiting on a specific PID and don't need to serialize all waits.
a.reapLock.RLock()
defer a.reapLock.RUnlock()

// Ensure we write out an exit code
exitCode := 0
defer a.remoteExecWriteExitCode(&event, &exitCode)
Expand Down
9 changes: 8 additions & 1 deletion command/agent/watch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"os"
"strconv"
"sync"

"github.com/armon/circbuf"
"github.com/hashicorp/consul/watch"
Expand All @@ -33,10 +34,16 @@ func verifyWatchHandler(params interface{}) error {
}

// makeWatchHandler returns a handler for the given watch
func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc {
func makeWatchHandler(logOutput io.Writer, params interface{}, reapLock *sync.RWMutex) watch.HandlerFunc {
script := params.(string)
logger := log.New(logOutput, "", log.LstdFlags)
fn := func(idx uint64, data interface{}) {
// Disable child process reaping so that we can get this command's
// return value. Note that we take the read lock here since we are
// waiting on a specific PID and don't need to serialize all waits.
reapLock.RLock()
defer reapLock.RUnlock()

// Create the command
cmd, err := ExecScript(script)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion command/agent/watch_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"io/ioutil"
"os"
"sync"
"testing"
)

Expand All @@ -25,7 +26,7 @@ func TestMakeWatchHandler(t *testing.T) {
defer os.Remove("handler_out")
defer os.Remove("handler_index_out")
script := "echo $CONSUL_INDEX >> handler_index_out && cat >> handler_out"
handler := makeWatchHandler(os.Stderr, script)
handler := makeWatchHandler(os.Stderr, script, &sync.RWMutex{})
handler(100, []string{"foo", "bar", "baz"})
raw, err := ioutil.ReadFile("handler_out")
if err != nil {
Expand Down

0 comments on commit 71e3901

Please sign in to comment.