Skip to content

Commit

Permalink
correct aborting of heartbeat goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
seletskiy committed Jun 8, 2016
1 parent a6f719a commit eabe4de
Show file tree
Hide file tree
Showing 22 changed files with 216 additions and 72 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/lorg
/.last-testcase
/orgalorg
/.cover
/coverage
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
[submodule "vendor/github.com/reconquest/containers.bash"]
path = vendor/github.com/reconquest/containers.bash
url = https://github.com/reconquest/containers.bash
[submodule "vendor/github.com/reconquest/go-test.bash"]
path = vendor/github.com/reconquest/go-test.bash
url = https://github.com/reconquest/go-test.bash
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include vendor/github.com/reconquest/test-runner.bash/Makefile
include vendor/github.com/reconquest/go-test.bash/Makefile
12 changes: 10 additions & 2 deletions distributed_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"strings"
"sync"
"time"

"github.com/seletskiy/hierr"
Expand Down Expand Up @@ -44,6 +45,8 @@ func (lock *distributedLock) acquire(filename string) error {
`failed to acquire lock, but continuing execution`,
).Error(),
)

continue
}

nodes := []string{}
Expand All @@ -63,8 +66,13 @@ func (lock *distributedLock) acquire(filename string) error {
return nil
}

func (lock *distributedLock) runHeartbeats(period time.Duration) {
func (lock *distributedLock) runHeartbeats(
period time.Duration,
canceler *sync.Cond,
) {
for _, node := range lock.nodes {
go heartbeat(period, node)
if node.connection != nil {
go heartbeat(period, node, canceler)
}
}
}
78 changes: 72 additions & 6 deletions heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,102 @@ import (
"bufio"
"io"
"strings"
"sync"
"time"

"github.com/seletskiy/hierr"
"github.com/theairkit/runcmd"
)

const (
heartbeatPing = "PING"
)

func heartbeat(period time.Duration, node *distributedLockNode) {
func heartbeat(
period time.Duration,
node *distributedLockNode,
canceler *sync.Cond,
) {
abort := make(chan struct{}, 0)

go func() {
canceler.L.Lock()
canceler.Wait()
canceler.L.Unlock()

abort <- struct{}{}
}()

finish := func(code int) {
canceler.L.Lock()
canceler.Broadcast()
canceler.L.Unlock()

<-abort

if remote, ok := node.runner.(*runcmd.Remote); ok {
tracef("%s closing connection", node.String())
err := remote.CloseConnection()
if err != nil {
warningf(
"%s",
hierr.Errorf(
err,
"%s error while closing connection",
node.String(),
),
)
}
}

exit(code)
}

ticker := time.Tick(period)

for {
_, err := io.WriteString(node.connection.stdin, heartbeatPing+"\n")
if err != nil {
logger.Fatal(hierr.Errorf(err, `can't send heartbeat`))
logger.Error(
hierr.Errorf(
err,
`%s can't send heartbeat`,
node.String(),
).Error(),
)

finish(2)
}

<-ticker
select {
case <-abort:
return

case <-ticker:
// pass
}

ping, err := bufio.NewReader(node.connection.stdout).ReadString('\n')
if err != nil {
logger.Fatal(hierr.Errorf(err, `can't receive heartbeat`))
logger.Error(
hierr.Errorf(
err,
`%s can't receive heartbeat`,
node.String(),
),
)

finish(2)
}

if strings.TrimSpace(ping) != heartbeatPing {
logger.Fatalf(
`received unexpected heartbeat ping: '%s'`,
logger.Errorf(
`%s received unexpected heartbeat ping: '%s'`,
node.String(),
ping,
)

finish(2)
}

tracef(`%s heartbeat`, node.String())
Expand Down
5 changes: 4 additions & 1 deletion lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ func acquireDistributedLock(
lockFile string,
runnerFactory runnerFactory,
addresses []address,
noLockFail bool,
) (*distributedLock, error) {
var (
lock = &distributedLock{}
lock = &distributedLock{
noFail: noLockFail,
}

nodeIndex = int64(0)
errors = make(chan error, 0)
Expand Down
40 changes: 28 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Options:
By default, orgalorg will not obtain root and do
all actions from specified user. To change that
behaviour, this option can be used.
-t --no-lock-abort Try to obtain global lock, but only print warning if
-t --no-lock-fail Try to obtain global lock, but only print warning if
it cannot be done, do not stop execution.
-r --root <root> Specify root dir to extract files into.
[default: /var/run/orgalorg/files/$RUNID]
Expand Down Expand Up @@ -172,14 +172,20 @@ var (
verbose = verbosityNormal
)

var (
exit = os.Exit
)

func main() {
logger.SetFormat(lorg.NewFormat("* ${time} ${level:[%s]:right} %s"))
currentUser, err := user.Current()
if err != nil {
logger.Fatal(hierr.Errorf(
logger.Error(hierr.Errorf(
err,
`can't get current user`,
))

exit(1)
}

usage := strings.Replace(usage, "$USER", currentUser.Username, -1)
Expand Down Expand Up @@ -213,7 +219,9 @@ func main() {
}

if err != nil {
logger.Fatal(err)
logger.Error(err)

exit(1)
}
}

Expand All @@ -229,7 +237,9 @@ func command(args map[string]interface{}) error {
commandToRun = append([]string{"sudo", "-n"}, commandToRun...)
}

cluster, err := connectAndLock(args)
canceler := sync.NewCond(&sync.Mutex{})

cluster, err := connectAndLock(args, canceler)
if err != nil {
return err
}
Expand Down Expand Up @@ -329,19 +339,21 @@ func synchronize(args map[string]interface{}) error {
tracef(`files to upload: %+v`, filesList)
}

cluster, err := connectAndLock(args)
canceler := sync.NewCond(&sync.Mutex{})

cluster, err := connectAndLock(args, canceler)
if err != nil {
return err
}

if lockOnly {
warningf("-L|--lock was passed, waiting for interrupt...")

wait := sync.WaitGroup{}
wait.Add(1)
wait.Wait()
canceler.L.Lock()
canceler.Wait()
canceler.L.Unlock()

os.Exit(0)
exit(0)
}

err = upload(args, cluster, filesList)
Expand Down Expand Up @@ -441,11 +453,14 @@ func upload(
return nil
}

func connectAndLock(args map[string]interface{}) (*distributedLock, error) {
func connectAndLock(
args map[string]interface{},
canceler *sync.Cond,
) (*distributedLock, error) {
var (
lockFile = args["--lock-file"].(string)
sendTimeout = args["--send-timeout"].(string)
noLockFail = args["--no-lock-abort"].(bool)
noLockFail = args["--no-lock-fail"].(bool)
)

addresses, err := parseAddresses(args)
Expand Down Expand Up @@ -487,7 +502,8 @@ func connectAndLock(args map[string]interface{}) (*distributedLock, error) {
cluster.runHeartbeats(
time.Duration(
float64(heartbeatMilliseconds)*heartbeatTimeoutCoefficient,
) * time.Millisecond,
)*time.Millisecond,
canceler,
)

return cluster, nil
Expand Down
14 changes: 11 additions & 3 deletions run_tests
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import "github.com/reconquest/containers"
import "github.com/reconquest/progress"
import "github.com/reconquest/test-runner"
import "github.com/reconquest/tests.sh"
import "github.com/reconquest/go-test"

include tests/ssh.sh
include tests/build.sh
Expand Down Expand Up @@ -52,13 +53,18 @@ test-runner:handle-custom-opt() {
progress:spinner:new _progress_spinner

test-runner:progress() {
progress:spinner:spin "$_progress_spinner" > /dev/null
if [ "${1:-}" = "stop" ]; then
printf " ok."
else
progress:spinner:spin "$_progress_spinner" > /dev/null
fi
}

:init() {
:build:init
go-test:set-output-dir "$(readlink -f .)"
go-test:build orgalorg

hastur:init openssh,pam,util-linux,tar,iproute2,sudo,sed
hastur:init openssh,pam,util-linux,tar,iproute2,sudo,sed,procps-ng
}

:cleanup() {
Expand All @@ -67,6 +73,8 @@ test-runner:progress() {
hastur:destroy-root

progress:spinner:stop "$_progress_spinner"

go-test:merge-coverage
}

:init 2> >(progress:spinner:spin "$_progress_spinner" > /dev/null)
Expand Down
3 changes: 2 additions & 1 deletion tests/orgalorg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ orgalorg_user="orgalorg"
:orgalorg:with-key() {
tests:debug "!!! connecting to hosts: ${ips[@]}"

orgalorg -u $orgalorg_user ${ips[*]/#/-o} -k "$(:ssh:get-key)" "${@}"
go-test:run orgalorg \
-u $orgalorg_user ${ips[*]/#/-o} -k "$(:ssh:get-key)" "${@}"
}
2 changes: 2 additions & 0 deletions tests/setup.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
go-test:set-prefix "$(tests:print-current-testcase | sed 's/\W/_/g')-"

:bootstrap-container() {
local container_name="$1"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
xargs -n1 <<< "${ips[@]}" | tests:put hosts

tests:ensure :orgalorg:with-key -o ./hosts -C echo hello '|' wc -l

tests:assert-stdout "$(containers:count)"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
tests:not tests:ensure :orgalorg:with-key -e -C exit 17

tests:assert-stderr "failed to evaluate"
tests:assert-stderr "non-zero code: 17"
16 changes: 2 additions & 14 deletions tests/testcases/locking/can-acquire-global-lock.test.sh
Original file line number Diff line number Diff line change
@@ -1,22 +1,10 @@
orgalorg_output="$(tests:get-tmp-dir)/oralorg.stdout"
tests:involve tests/testcases/locking/lock.sh

tests:run-background orgalorg \
tests:silence tests:pipe \
:orgalorg:with-key --lock '2>&1' '|' tee $orgalorg_output

while ! cat "$orgalorg_output" 2>/dev/null | grep -qF "waiting for interrupt"
do
tests:debug "[orgalorg] waiting for global lock..."
sleep 0.1
done

tests:debug "[orgalorg] global lock has been acquired"
:orgalorg:lock orgalorg_output orgalorg_pid

tests:not tests:ensure :orgalorg:with-key --lock
tests:assert-stderr "lock already"

orgalorg_pid=$(tests:get-background-pid "$orgalorg")

pkill -INT -P "$orgalorg_pid"

_exited_with_ctrl_c=130
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
tests:involve tests/testcases/locking/lock.sh

:orgalorg:lock orgalorg_output orgalorg_pid --send-timeout=2000

tests:wait-file-changes "$orgalorg_output" 0.1 10 \
:ssh "${ips[0]}" pkill -f flock

tests:ensure grep -q "ERROR.*${ips[0]}.*heartbeat" "$orgalorg_output"
Loading

0 comments on commit eabe4de

Please sign in to comment.